Programmer's Python: Async - The Process Pool
Written by Mike James   
Monday, 20 November 2023
Article Index
Programmer's Python: Async - The Process Pool
Waiting For Processes
Computing Pi using AsyncResult

Now that you have a pool of processes you need to submit jobs to it and there are a range of Pool methods that let you do this. The Pool object can only be used by the process that created it – don’t try to call Pool methods from other processes. In this sense the process that creates the Pool object is in charge of what happens.

The most basic job-submitting function is apply_async:

apply_async(func, args, kwds, callback, error_callback)

this submits the callable, specified as func with the optional arguments and keywords, to a process in the pool. The optional callbacks can be used to deal with the result of the process, but there are better ways to do this. Notice that the function returns immediately and a pool process starts to run the job. This means you can use apply_async to run multiple jobs using pool processes.

There is also an apply method:

apply(func,args,kwds)

which waits until the pool process has finished before returning and so can only be used to run a single job at at time. For example, we can start two simple jobs and wait for them to finish:

import multiprocessing.pool
import multiprocessing
import random
import time
def myProcess(): time.sleep(random.randrange(1,4)) print(multiprocessing.current_process().name) if __name__ == '__main__': p = multiprocessing.pool.Pool(2) p.apply_async(myProcess) p.apply_async(myProcess) p.close() p.join()

After a delay, this displays:

ForkPoolWorker-1
ForkPoolWorker-2

Note that the print functions used to display the names of the processes are subject to race conditions and should be protected by a lock.

You could specify callback functions to receive the results of the process:

import multiprocessing.pool
import multiprocessing
import random
import time
def myProcess(): time.sleep(random.randrange(1,4)) return multiprocessing.current_process().name
def myCallback(result): print(result)
if __name__ == '__main__': p = multiprocessing.pool.Pool(2) p.apply_async(myProcess,callback = myCallback) p.apply_async(myProcess,callback = myCallback) p.close() p.join()

The callback is run in the main process and while it is running nothing else can happen, so in general the callback should finish as quickly as possible to avoid freezing the main process. In this example this doesn’t matter as the main process is suspended anyway. You can set an error callback in the same way. The problem with using callbacks is getting them to synchronize with the operation of the main process – this is a recurrent theme and the solution is to use something like a deferred object also known as a future.

Waiting for Pool Processes

The apply_async is an easy way to get processes started, the real problem is waiting for them. You can use join after a close to wait for all of the processes to end, but this isn’t usually what you want. All of the Pool job methods return an AsyncResult object, which can be used to inquire about the current status of a job and to get results. The AsyncResult object is also interesting because it is the first example of something that programmers in other languages would call a promise and which Python terms a Future, more of which later.

 

The AsyncResult object is available for the main process to use, but it is also manipulated by the child process and thus provides a means of communication between the two.

The AsyncResult object has four methods:

  • get(timeout) - waits for the optional timeout or when the result is ready and returns the result or raises a TimeOutError

  • wait(timeout) - as for get but doesn’t return a result

  • ready() - returns True if complete and False otherwise

  • successful() - returns True if complete without exception and raises an exception otherwise.

Of these the most useful is get as it returns the result of the function that you are running using Pool.

For example:

import multiprocessing.pool
import multiprocessing
import random
import time
def myProcess():
    time.sleep(random.randrange(1,4))
    return multiprocessing.current_process().name
if __name__ == '__main__': with multiprocessing.pool.Pool(2) as p: asr1 = p.apply_async(myProcess) asr2 = p.apply_async(myProcess) result1=asr1.get() result2=asr2.get() print(result1,result2)

The function now returns the name of the process running it as its result and this is what the get returns. We simply wait for the two processes to complete and return a result. In this case we can use a with because it now completes after the two processes have completed and hence a call to terminate is what we want. This example displays:

ForkPoolWorker-1 ForkPoolWorker-2

There is no easy way to wait for one of a set of processes to end or similar – the Pool isn’t intended for this sort of use. You can try to use knowledge of the internal workings of Pool to access the processes, but this tends to interfere with its overall operation.

The simplest solution is to use the ready method in a polling loop with a sleep to suspend the main process and allow the child processes access to the CPUs:

import multiprocessing.pool
import multiprocessing
import random
import time
def myProcess(): time.sleep(random.randrange(1,6)) return multiprocessing.current_process().name
if __name__ == '__main__': with multiprocessing.pool.Pool(2) as p: asr1 = p.apply_async(myProcess) asr2 = p.apply_async(myProcess)
waiting=True while waiting: time.sleep(0.01) if(asr1.ready()): print(asr1.get()) break if(asr2.ready()): print(asr2.get()) break

An alternative is to use imap_unordered which presents the results of processes in the order that they occur.

pythonAsync180



Last Updated ( Monday, 20 November 2023 )