Programmer's Python Async - Sharing Data Pipes & Queues
Written by Mike James   
Tuesday, 18 July 2023
Article Index
Programmer's Python Async - Sharing Data Pipes & Queues
Queue Example
Pipes

Pipes

In most cases you are better off using the higher-level Queue to exchange complex data between processes, but the Queue makes use of the lower-level Pipe, which is provided by the operating system. A Queue is buffered and, unless you restrict the number of items it can hold, a put should not block. In contrast, a Pipe isn’t buffered and it is very possible for a writer to have to wait until a reader makes space in the Pipe. For all its disadvantages, a Pipe is about three times faster than a Queue.

A Pipe really is like a data pipe that connects two processes. The Pipe has two ends and each end has a Connection object which the process can use to send and receive data. To create a Pipe you can use its constructor:

con1, con2 = multiprocessing.Pipe()

By default the Pipe is bi-directional and the connection objects con1 and con2 can be used to send and receive data. If you want a uni-directional pipe you can use:

con1, con2 = multiprocessing.Pipe(False)

and then con1 can only be used to receive data and con2 to transmit data.

A Connection object has methods to send and receive Python objects:

Connection.send(obj)
obj = Connection.recv()

The Python objects are reduced to a sequence of bytes that can be sent over an operating system pipe by being pickled. If the object is too big, above around 32MB, you will get a ValueError exception.

If you want to send raw bytes as data you can use:

Connection.send_bytes(buffer)
buffer=Connection.recv_bytes()

or:

Connection.recv_bytes_into(buffer)

You can specify an offset and size for the send operation and a maxlength for the receive operation. The same size limit of around 32MB applies to byte data transfer. The data in buffer is a bytes or bytearray object, explained in Programmer’s Python: Everything Is Data,ISBN: 978-1871962598.

If there is nothing to read from the Pipe, the recv methods will block until there is or the Pipe is closed using Pipe.close(). To discover if there is any data waiting to be read you can use Connection.poll(timeout).

A Connection object can also be used in as a context manager as if you were working with a file. That is:

with con1:
	con1.send(“Hello Pipe World”)

is equivalent to:

con1.send(“Hello Pipe World”)
con1.close()

The best way to see a Pipe in action is to create a version of the previous example that used a Queue to transfer data between two processes:

import multiprocessing
from time import sleep
def addCount(con):
    for i in range(500):
        con.send(i)
def getCount(con,l):
    while True:
        sleep(0.005)
        i=con.recv()
        with l:
            print(i,multiprocessing.
current_process().name) if __name__ == '__main__': con1,con2=multiprocessing.Pipe() pLock=multiprocessing.Lock() p1=multiprocessing.
Process(target=addCount,args=(con1,)) p2=multiprocessing.
Process(target=getCount,args=(con2,pLock)) p3=multiprocessing.
Process(target=getCount,args=(con2,pLock)) p1.start() p2.start() p3.start() p2.join()

The Pipe is bi-directional, i.e. duplex, and we pass the connection object for one end to the addCount function and for the other end to the two getCount processes. Notice that there can be any number of readers and writers at each end of the Pipe. The data in the Pipe isn’t duplicated to each process that reads it – any process reading an item from the Pipe removes it.

A confusing part of using a Pipe is that it has two ends. In this case we are using the con1 to con2 direction and items that are sent using con1 are read using con2. To send data the other way a process would write to con2 and then other processes could read from con1. In this sense a Pipe behaves as if it was two queues formed by con1 and con2 and by con2 and con1.

The biggest problem with using Pipes is working out when there is data to read and how much. Using Python you can make this a simpler problem by sending and receiving objects, rather than unstructured bytes. A particular problem with this approach, however, is that if the process reading or writing an object is terminated the Pipe might be left in an unreadable state because it contains only part of an object.

Queues for Threads

There isn’t much point in using either a Queue or a Pipe to communicate between threads in the same process as the operating system goes to great lengths to implement a pipe which isn’t needed. Also given that threads share the same environment all that is needed is a data structure with suitable locks or equivalently atomic operations. If you don’t want to do it yourself you can use the queue module. This has a set of ready made queues with updates automatically locked. There are three basic queue objects:

  • queue.Queue(maxsize=0)
    a FIFO queue that works exactly like multiprocessing.
  • Queue queue.LifoQueue
    (maxsize=0)a queue or stack that works exactly like multiprocessing.Queue except the last item added is the first to be retrieved, hence LIFO.
  • Queue.PriorityQueue(maxsize=0)
    A priority queue that stores items and returns them in the order of priority. That is, element x is returned before element y if x<y. If you use a tuple like (priority, data) then elements with the smallest value of priority are returned first and if there is more than one they are returned in data order.

In addition there is also a queue.SimpleQueue class which lacks some of the methods of the other queues.

It is also worth mentioning that collections.deque is thread-safe and you can use its append and pop methods without additional locking. To know more refer to Programmer’s Python: Everything Is Data.

In chapter but not in this extract

  • Shared Memory
  • Shared ctypes
  • Raw Shared Memory
  • Shared Memory Manager
  • Computing Pi

 Summary

  • Processes don’t share data as a matter of course because they don’t share memory. Each process runs in its own allocated memory.

  • The multiprocessing Queue is the highest level way of sharing data. It is a FIFO stack and is very easy to use because it is buffered. Any number of processes can add or remove data from the queue.

  • A standard Queue data structure can be used to share data between threads.

  • The Pipe is a lower level implementation of sharing between processes and is a Python version of a facility provided by the operating system.

  • A Pipe is bi-directional and you can read and write at each end of the pipe using connection objects. Data written to one end of the pipe can be read from the other end.

  • Any number of processes can have access to either end of the Pipe.

  • An equally fundamental way of sharing data is shared memory and this is supported in hardware and by the operating system.

  • You can share a block of memory and use it to exchange data between any number of processes.

  • Python lets you use the ctypes module to transfer data using shared memory. You have to convert Python data structures to C data structures to transfer via shared memory.

  • There is also a more basic way to access shared memory in its raw form. The only problem is that the data takes the form of a byte sequence and so you have to perform the conversion between Python data and byte data manually.

  • A slightly easier to use, but still restricted, form of raw shared memory is ShareableList which lets you treat the buffers an array of limited types – int, float, string and byte.

Programmer's Python:
Async
Threads, processes, asyncio & more

Is now available as a print book: Amazon

pythonAsync360Contents

1)  A Lightning Tour of Python.

2) Asynchronous Explained

3) Processed-Based Parallelism
         Extract 1 Process Based Parallism
4) Threads
         Extract 1 -- Threads
5) Locks and Deadlock

6) Synchronization

7) Sharing Data
        Extract 1 - Pipes & Queues

8) The Process Pool
        Extract 1 -The Process Pool 1 

9) Process Managers

10) Subprocesses ***NEW!

11) Futures
        Extract 1 Futures

12) Basic Asyncio
        Extract 1 Basic Asyncio

13) Using asyncio
        Extract 1 Asyncio Web Client
14) The Low-Level API
       Extract 1 - Streams & Web Clients
Appendix I Python in Visual Studio Code

 

espbook

 

Comments




or email your comment to: comments@i-programmer.info

To be informed about new articles on I Programmer, sign up for our weekly newsletter, subscribe to the RSS feed and follow us on Twitter, Facebook or Linkedin.

<ASIN:1871962765>

<ASIN:1871962749>

<ASIN:1871962595>



Last Updated ( Tuesday, 18 July 2023 )