Programmer's Python Async - Sharing Data Pipes & Queues
Written by Mike James   
Tuesday, 18 July 2023
Article Index
Programmer's Python Async - Sharing Data Pipes & Queues
Queue Example
Pipes

If you are going in for process-based asychronous code you need to find ways to share data - it doesn't just happen as it does with threads. Find out how to share data safely between processes in this extract from Programmer's Python: Async.

Programmer's Python:
Async
Threads, processes, asyncio & more

Is now available as a print book: Amazon

pythonAsync360Contents

1)  A Lightning Tour of Python.

2) Asynchronous Explained

3) Processed-Based Parallelism
         Extract 1 Process Based Parallism
4) Threads
         Extract 1 -- Threads
5) Locks and Deadlock

6) Synchronization

7) Sharing Data
        Extract 1 - Pipes & Queues

8) The Process Pool
        Extract 1 -The Process Pool 1 ***NEW!

9) Process Managers

10) Subprocesses

11) Futures
        Extract 1 Futures

12) Basic Asyncio
        Extract 1 Basic Asyncio

13) Using asyncio
        Extract 1 Asyncio Web Client
14) The Low-Level API
       Extract 1 - Streams & Web Clients
Appendix I Python in Visual Studio Code

 

Processes have access to the same range of synchronization primitives as threads. You can use Lock, Rlock, Event, Semaphore, Barrier and Condition with processes in almost exactly the same way as with threads. What is very different, however, is that processes do not share global variables and thus there is very little to lock! Of course, for processes to work together towards some common objective they need to share some data and there are a number of different ways of doing this. There are two shared data structures, the Queue and the Pipe, which are easy to use and usually powerful enough for most problems. The Queue has the advantage of being usable by processes and threads. The Pipe is closer to the operating system.

Beyond these two data structures there are some more sophisticated and flexible options. You can use a shared area of memory to transfer data directly between any number of processes. This is made easier by the use of the ctypes module which allows the specification of Python types to C types.

Finally we have raw shared memory, which is very close to the way the hardware allows processes to share data. The only problem with this alternative is that everything is done in terms of bytes rather than data structures.

The Queue

The multiprocessing.Queue is an implementation of a First In First Out (FIFO) stack that can be shared between processes. It is implemented using a pipe, see later, and some locks. It is a shared data structure at a higher level than the Pipe. In particular, the items that you can add and remove from the queue are Python objects, not just basic data types. If you add an object to a Queue it is first pickled to reduce its size and automatically un-pickled when it is retrieved. This is explained in Programmer’s Python: Everything Is Data,ISBN: 978-1871962598.

 

A FIFO stack is similar to the standard queue that we are all used to. Items join the queue at that back and leave the queue at the front. This means that the first item to join the queue is the first item out of the queue, hence the name. To create a Queue you use the constructor:

q=multiprocessing.Queue(maxsize=0)

The two basic operations are:

  • q.put(object, block=True, timeout=-1)
  • object=q.get(block=True, timeout=-1)

The put operation adds the object to the tail of the queue and the get removes and returns an object from the head of the queue. You can specify a blocking or non-blocking operation and a timeout. A non-blocking operation returns at once with an object or it raises the Empty exception. Notice that both operations can block as locks are used to protect the queue from a data race caused by overlapping access from multiple threads. That is, at any given time only one thread can be getting or putting data from or to the queue. Another reason that the put operation can wait is if the Queue is full. The put will wait until either another process gets some data or a timeout expires when a Full exception is raised.

The Queue works in a fairly sophisticated way. As long as you don’t specify a maxsize in the constructor you can store as many items in it as there is memory available. When the first item is added to the Queue a thread is started which transfers the data to the pipe, see later, that the Queue is built on. This means that the put doesn’t have to wait and there is always a free place in the Queue for new data. If you do specify a maxsize parameter then the Queue can only hold that number of items – maxsize gives the number of items, not the memory allocated. In this case it is possible for a put to have to wait until a space in the Queue becomes available. Notice that because of the buffering provided by the thread it is possible for the state of the Queue to lag behind the put operation. For example, you can put an item to the Queue and then test to see if it is empty using empty() only to find that it is. A moment later the thread will send the item to the Queue and empty() will return False.



Last Updated ( Tuesday, 18 July 2023 )