'Watch stdout and stderr of a subprocess simultaneously
How can I watch standard output and standard error of a long-running subprocess simultaneously, processing each line as soon as it is generated by the subprocess?
I don't mind using Python3.6's async tools to make what I expect to be non-blocking async loops over each of the two streams, but that doesn't seem to solve the problem. The below code:
import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async for f in p.stdout:
print(datetime.now(), f.decode().strip())
async for f in p.stderr:
print(datetime.now(), "E:", f.decode().strip())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run('''
echo "Out 1";
sleep 1;
echo "Err 1" >&2;
sleep 1;
echo "Out 2"
'''))
loop.close()
outputs:
2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1
While I expect it to output something like:
2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2
Solution 1:[1]
To accomplish this, you need a function that will take two async sequences and merge them, producing the results from either one or the other, as they become available. With such a function in stock, run
could look like this:
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async for f in merge(p.stdout, p.stderr):
print(datetime.now(), f.decode().strip())
A function like merge
does not (yet) exist in the standard library, but the aiostream
external library provides one. You can also write your own using an async generator and asyncio.wait()
:
async def merge(*iterables):
iter_next = {it.__aiter__(): None for it in iterables}
while iter_next:
for it, it_next in iter_next.items():
if it_next is None:
fut = asyncio.ensure_future(it.__anext__())
fut._orig_iter = it
iter_next[it] = fut
done, _ = await asyncio.wait(iter_next.values(),
return_when=asyncio.FIRST_COMPLETED)
for fut in done:
iter_next[fut._orig_iter] = None
try:
ret = fut.result()
except StopAsyncIteration:
del iter_next[fut._orig_iter]
continue
yield ret
The above run
will still differ from your desired output in one detail: it will not distinguish between output and error lines. But this can be easily accomplished by decorating the lines with an indicator:
async def decorate_with(it, prefix):
async for item in it:
yield prefix, item
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async for is_out, line in merge(decorate_with(p.stdout, True),
decorate_with(p.stderr, False)):
if is_out:
print(datetime.now(), line.decode().strip())
else:
print(datetime.now(), "E:", line.decode().strip())
Solution 2:[2]
It has occurred to me that there is actually a simpler solution to the problem, at least if the watching code is such that it doesn't need to be in a single coroutine invocation.
What you can do is spawn two separate coroutines, one for stdout and one for stderr. Running them in parallel will give you the needed semantics, and you can use gather
to await their completion:
def watch(stream, prefix=''):
async for line in stream:
print(datetime.now(), prefix, line.decode().strip())
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
await asyncio.gather(watch(p.stdout), watch(p.stderr, 'E:'))
Solution 3:[3]
Here is an example that does noes not have any external dependencies:
def h_out(s):
print(f"[O] {s}")
def h_err(s):
print(f"[E] {s}")
async def _rs(stream, cb, enc):
while True:
line = await stream.readline()
if line:
line = line.decode(enc)
cb(line.rstrip())
else:
break
cmd = ['tail', '-f', '/var/log/syslog']
enc = 'utf-8'
p = await asyncio.create_subprocess_exec(*cmd
, stdout=PIPE, stderr=PIPE)
await asyncio.wait([_rs(p.stdout, h_out, enc)
,_rs(p.stderr, h_err, enc)])
await p.wait()
Full example of this code that works for Windows, Linux, BSD etc: github.com/JavaScriptDude/PyTail.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | user4815162342 |
Solution 3 | Timothy C. Quinn |