Programmer's Python: Async - Subprocesses
Written by Mike James   
Monday, 24 June 2024
Article Index
Programmer's Python: Async - Subprocesses
Input/Output
Interaction
Non-Blocking Read Pipe
Program listing

Non-Blocking Read Pipe

What we really need to stop all of these deadlocks caused by pipe buffering is a non-blocking read. The problem is that the subprocess module doesn’t support non-blocking pipe operations, although asyncio does. However, as we know all about threading it is easy to change a blocking function call into a non-blocking call. The trick is to run the blocking function on another thread and provide a way for the main thread to check that there are some results ready to be processed.

In the case of pipe.read the job is very simple because we can arrange for a new thread to always read ahead by one character and have that character ready for the main thread to consume. As there is a good deal of static data needed to maintain the state, it is better to use an object rather than a function:

class AsyncReader():
def __init__(self,stream): self._stream = stream self._char = "" self.t = threading.Thread(
target=self._get,daemon = True) self.t.start()

The object is linked to a particular pipe via the stream parameter, which is specified as part of the constructor and saved for later. The initializer creates a thread which attempts to read the pipe even before it has been requested:

    def _get(self):
        self._char=self._stream.read(1)

This is run on a separate thread and so it simply waits until a single character is ready to read. It doesn’t matter how long it waits as it isn’t holding up the main thread. When a character is ready it stores it in _char and the thread comes to an end. What this means is that if the thread is still running we haven’t read a character and if it has terminated a character is waiting. Notice that we don’t have to lock the access to _char as only this new thread is accessing it. Now we can write a read method which doesn’t block:

    def get(self):     
self.t.join(0.04)
if self.t.is_alive():
return ""
else:
result=self._char
self.t=threading.Thread(
target=self._get,daemon=True)
self.t.start()
return result

First we join the thread for four hundredths of a second. If the thread has finished the join continues without waiting. The reason we wait is to give the script at the other end of the pipe time to send a character. Without the join we could read the pipe too fast and conclude that the message being sent was complete. Next we check to see if the thread is still running. If it is then it is still waiting for the pipe to produce a character and we return a null string to indicate that there is no character available. If it has finished we create a new thread to read the next character and return the character stored in _char.

The reason for storing _char in the local variable result before starting the thread again is to avoid any race condition without locking. At this point the thread isn’t running and so we can safely read the data in _char without locking. Once the thread has started then it could change the value in _char before the method returns and so lose a character. Doing things in this order means that we can avoid locking, employing a lock-free algorithm.

The get method reads a single character, but we generally want to read a complete message from the child process. Usually a complete message ends with a newline but not always. Sometimes the child process will simply display a message and then wait for the user to input something on the same line. What does mark the end of a message that needs a response is that the flow of characters stops while the user is given a chance to type a response. This “pause” detection is already included in the get method. Now we can use it to write a method that builds up a complete message:

    def readmessage(self):
ans=""
while True:
char=self.get()
if char=="":
break
ans+=char
return ans

Now we have an object which provides a non-blocking read of a pipe we can put this together to read a message from the child process:

p = subprocess.Popen(["myscript.bat"],
stdout=subprocess.PIPE,stdin=subprocess.PIPE,
bufsize=1,universal_newlines=True, text=True) aRead=AsyncReader(p.stdout) ans=aRead.readmessage() print("message",ans) p.stdin.write("mike\n") ans=aRead.readmessage() print("message",ans)

This now works irrespective on whether or not the message ends with a newline and it works under Windows and Linux (with a change to the name of the script).



Last Updated ( Monday, 24 June 2024 )