Programmer's Python Async - Sharing Data Pipes & Queues |
Written by Mike James | ||||
Tuesday, 18 July 2023 | ||||
Page 3 of 3
PipesIn most cases you are better off using the higher-level Queue to exchange complex data between processes, but the Queue makes use of the lower-level Pipe, which is provided by the operating system. A Queue is buffered and, unless you restrict the number of items it can hold, a put should not block. In contrast, a Pipe isn’t buffered and it is very possible for a writer to have to wait until a reader makes space in the Pipe. For all its disadvantages, a Pipe is about three times faster than a Queue. A Pipe really is like a data pipe that connects two processes. The Pipe has two ends and each end has a Connection object which the process can use to send and receive data. To create a Pipe you can use its constructor: con1, con2 = multiprocessing.Pipe() By default the Pipe is bi-directional and the connection objects con1 and con2 can be used to send and receive data. If you want a uni-directional pipe you can use: con1, con2 = multiprocessing.Pipe(False) and then con1 can only be used to receive data and con2 to transmit data. A Connection object has methods to send and receive Python objects: Connection.send(obj) obj = Connection.recv() The Python objects are reduced to a sequence of bytes that can be sent over an operating system pipe by being pickled. If the object is too big, above around 32MB, you will get a ValueError exception. If you want to send raw bytes as data you can use: Connection.send_bytes(buffer) buffer=Connection.recv_bytes() or: Connection.recv_bytes_into(buffer) You can specify an offset and size for the send operation and a maxlength for the receive operation. The same size limit of around 32MB applies to byte data transfer. The data in buffer is a bytes or bytearray object, explained in Programmer’s Python: Everything Is Data,ISBN: 978-1871962598. If there is nothing to read from the Pipe, the recv methods will block until there is or the Pipe is closed using Pipe.close(). To discover if there is any data waiting to be read you can use Connection.poll(timeout). A Connection object can also be used in as a context manager as if you were working with a file. That is: with con1: con1.send(“Hello Pipe World”) is equivalent to: con1.send(“Hello Pipe World”) con1.close() The best way to see a Pipe in action is to create a version of the previous example that used a Queue to transfer data between two processes: import multiprocessing from time import sleep def addCount(con): for i in range(500): con.send(i) def getCount(con,l): while True: sleep(0.005) i=con.recv() with l: print(i,multiprocessing. The Pipe is bi-directional, i.e. duplex, and we pass the connection object for one end to the addCount function and for the other end to the two getCount processes. Notice that there can be any number of readers and writers at each end of the Pipe. The data in the Pipe isn’t duplicated to each process that reads it – any process reading an item from the Pipe removes it. A confusing part of using a Pipe is that it has two ends. In this case we are using the con1 to con2 direction and items that are sent using con1 are read using con2. To send data the other way a process would write to con2 and then other processes could read from con1. In this sense a Pipe behaves as if it was two queues formed by con1 and con2 and by con2 and con1. The biggest problem with using Pipes is working out when there is data to read and how much. Using Python you can make this a simpler problem by sending and receiving objects, rather than unstructured bytes. A particular problem with this approach, however, is that if the process reading or writing an object is terminated the Pipe might be left in an unreadable state because it contains only part of an object. Queues for ThreadsThere isn’t much point in using either a Queue or a Pipe to communicate between threads in the same process as the operating system goes to great lengths to implement a pipe which isn’t needed. Also given that threads share the same environment all that is needed is a data structure with suitable locks or equivalently atomic operations. If you don’t want to do it yourself you can use the queue module. This has a set of ready made queues with updates automatically locked. There are three basic queue objects:
In addition there is also a queue.SimpleQueue class which lacks some of the methods of the other queues. It is also worth mentioning that collections.deque is thread-safe and you can use its append and pop methods without additional locking. To know more refer to Programmer’s Python: Everything Is Data. In chapter but not in this extract
Summary
Programmer's Python:
|
||||
Last Updated ( Tuesday, 18 July 2023 ) |