Programmer's Python: Async - Futures |
Written by Mike James | |||||
Monday, 28 August 2023 | |||||
Page 4 of 4
Using a Process Manager to Share ResourcesIf you want to use a more sophisticated resource sharing method then you need to create a manager and pass the proxy objects it creates to the processes: import concurrent.futures import multiprocessing import multiprocessing.managers import time import ctypes def counter(count,lock): for i in range(10000): with lock: temp=count.value+1 count.value=temp if __name__ == '__main__': with multiprocessing.Manager() as man: with concurrent.futures.ProcessPoolExecutor(2) In this case we need to pass both a Value object and a Lock object because the manager’s Value object doesn’t have a built-in lock. The lock is used in the with statement in the counter function. If you remove it you will find that the result is less than 20000 due to race conditions. Notice that we don’t need to use the initializer as now the shared objects are passed as parameters. To be more accurate, the proxies to the shared objects are passed as parameters and these are standard Python objects which are pickleable. The proxy objects connect to the manager’s server running in a separate process. This means we not only have one more process running, we also have the overhead of using a pipe to allow the proxy objects to communicate with the shared object. As a result this is slow. Compared to the use of the basic multiprocessing shared objects this takes more than ten times as long to complete. Sharing Futures and DeadlockThere is another difference between using threads and processes when it comes to futures. In a threaded environment futures are generally accessible by more than one thread. The reason is that they are usually global to make sure that they live long enough to resolve and deliver their result. What this means is that not only can the use of locks result in deadlock, so can the use of futures by multiple threads. For example, consider what happens if we define two functions, taskA that waits for Future f2 and taskB waits for Future f1: import concurrent.futures import time def taskA(): time.sleep(1) ans=f2.result() return ans def taskB(): time.sleep(1) ans=f1.result() return ans with concurrent.futures.ThreadPoolExecutor(2) Of course the result is deadlock. The sleep at the start of taskA is necessary to allow taskB to be started and create f2 before taskA tries to use it. This may be a contrived example, but in real life deadlocks due to waiting on futures happen in ways that are much more difficult to detect. Notice that this can’t happen with process-based futures because these aren’t shared between processes. If you can avoid accessing futures on threads that didn’t create them then you can avoid deadlock. Computing Pi with FuturesComputing pi using futures is very similar to the previous example using a process pool. It would seem to be more instructive to implement the example using a thread pool but as this would show no speed advantage due to the GIL, a version using the process executor is more interesting. Converting the code to use a thread pool is a matter of changing one line: import concurrent.futures import time def myPi(m,n): pi=0 for k in range(m,n+1): s= 1 if k%2 else -1 pi += s / (2 * k - 1) return pi*4 if __name__ == '__main__': N=10000000 with concurrent.futures.ProcessPoolExecutor(2) Notice that now we pass the parameters to the called function without the need to use a list or tuple and the calls to result makes the main process wait until the relevant future resolves. If there is an exception in the function this is passed to the main process. Also notice that no locking is required as the threads do not make use of shared resources and they return their results using a future. If you change the with to read: with concurrent.futures.ThreadPoolExecutor(2) as execute: then, with no other changes, you have a version which works with threads. This takes more than twice as long to run as the process version, which is what you would expect. Process Pool or Concurrent Futures?Python seems to have two modules which do similar things. ProcessPool provides a futures-like approach using AsyncResult and a wide range of map-like operations. However, it doesn’t do a good job of supporting a thread pool equivalent. The concurrent.futures module, on the other hand, provides a more complete futures approach and both process and thread pools are well supported. You can also make use of multiprocessing managers, which isn’t surprising as the sharing by proxy approach does work with almost any type of process, irrespective of how it has been created. In most cases the best choice is concurrent.futures backed up by multiprocessing. Only use multiprocessing.pool if you need the more advanced map-style functions. Summary
Programmer's Python:
|
|||||
Last Updated ( Monday, 28 August 2023 ) |