Programmer's Python: Async - Futures
Written by Mike James   
Monday, 28 August 2023
Article Index
Programmer's Python: Async - Futures
Executors
Locking and Sharing Data
Using a Process Manager to Share Resources

Executors

The concurrent.futures module supports both a thread and a process pool and provides easy ways to create and make use of them via the idea of an executor. There are two executors, ProcessPoolExecutor and ThreadPoolExecutor which work in the same way as the ProcessPool described in Chapter 8. To create an Executor you use the appropriate constructor:

concurrent.futures.ThreadPoolExecutor(max_workers=None,
thread_name_prefix='')

or:

concurrent.futures.ProcessPoolExecutor(max_workers=None)

In either case if max_workers isn’t specified the number of processors available is used.

Although currently not documented you can also specify two additional parameters, initializer = function and initargs = arguments tuple which call function with the specified arguments before each thread/process starts. This is a way of getting Python global objects set up before a process runs. It is less useful for threads as they already share global objects. Notice that the initializer is only called once when the thread/process is created as part of the pool. If the pool is reused to run other functions the initializer is not rerun.

Once you have an Executor you can run a single function using:

Executor.submit(function, args, kwargs)

As long as a thread or process is available for use, the function runs immediately, otherwise it queues and waits for a thread or process to become available. A Future is returned immediately and you can use this to monitor the function and get a result. Notice that the parameters aren’t passed as an explicit list and dictionary as with other similar methods.

That is:

Executor.submit(function, 1, 2, myParam = ”3”)

calls:

function(1, 2, myParam = ”3”)

If you want to run multiple functions in one operation then you can use:

Executor.map(function, iterables, timeout=None,
chunksize=1)

This is similar to map_async, see Chapter 8, but in this case you can specify more than one iterable and these will be used to supply more than one parameter to the function. chunksize works in the same way, it determines the number of tasks submitted to each process – it is ignored for threads. An iterator over the results is returned and this gives the next result available and will time out if specified. Also the multiprocessing.pool module has a range of much more advanced map-like functions.

Finally we have Executor.shutdown(wait = True) which closes the executor and frees resources. Calls to submit or map raise RuntimeError after the Executor has been shut down. If wait is True the shutdown will block until all Futures are resolved. If it is False then it returns immediately, but the Python program will not exit until all futures have resolved.

You can use an Executor in a with clause:

with ThreadPoolExecutor() as e:
	e.submit(myfunction)

This creates the Executor and shuts it down with wait = True – i.e. it waits for all of the futures to resolve.

Neither the thread or the process Executor help with the problem of shared resources, race conditions and deadlock. You need to use the techniques we have outlined in previous chapters to share data safely.

I/O-Bound Example

Using concurrent.futures is very like using processpool but you also have a threadpool and a better Future at your disposal. To demonstrate how similar they are, consider the problem of using a threadpool to download some HTML files:

import concurrent.futures
import time
import urllib.request
def download():
    with urllib.request.urlopen(
'http://www.example.com/') as f: html= f.read().decode('utf-8') return html
with concurrent.futures.ThreadPoolExecutor()
as executor: t1=time.perf_counter() f1 = executor.submit(download) f2 = executor.submit(download) t2=time.perf_counter() print((t2-t1)*1000) print(f1.result()[:25]) print(f2.result()[:25])

This is very similar to the earlier example, but now we have the advantage of having a Future returned from the download function. The download function returns a string with all of the HTML from the site and the print just displays the first 25 characters. What is remarkable is that you can convert this program to work with processes rather than threads by changing the with clause to read:

with concurrent.futures.ProcessPoolExecutor() 
as executor:

If you are not using a fork you will also need to add:

if __name__ == '__main__':

before the with to stop the main program running in the child process.

Waiting On Futures

Futures also make waiting for results easier. You can use the wait function to wait for the first completed, first exception or all completed:

concurrent.futures.wait(listOfFutures, timeout = None,
return_when = ALL_COMPLETED)

This waits for the futures listed to resolve according to the setting of return_when:

  • FIRST_COMPLETED first future to finish or be canceled

  • FIRST_EXCEPTION first future to finish by raising an exception
    (if no future raises an exception it is equivalent to ALL_COMPLETED)

  • ALL_COMPLETED wait for all Futures to resolve

The function returns a named tuple (done, not-done) with each item being a set containing the resolved and the unresolved futures respectively. Notice that the not-done set includes all of the futures that correspond to tasks that are still running. For example, to wait for the first thread to download a web page, we would change the main program to:

t1=time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() 
as executor: f1 = executor.submit(download) f2 = executor.submit(download) t2=time.perf_counter() print((t2-t1)*1000) res=concurrent.futures.wait([f1,f2],
return_when=concurrent.futures.FIRST_COMPLETED) for f in res.done: print(f.result()[:25])

You can also use as_complete to deal with tasks as they complete:

iter=concurrent.futures.as_complete(listOfFutures,
timeout=None)

This returns an iterator which can be used to step through futures as they resolve, for example:

with concurrent.futures.ThreadPoolExecutor() 
as executor: f1 = executor.submit(download) f2 = executor.submit(download) for f in concurrent.futures.as_completed([f1,f2]): print(f.result()[:25])

In chapter but not in this extract

  • Future Done Callbacks
  • Dealing With Exceptions


Last Updated ( Monday, 28 August 2023 )