'Watch stdout and stderr of a subprocess simultaneously

How can I watch standard output and standard error of a long-running subprocess simultaneously, processing each line as soon as it is generated by the subprocess?

I don't mind using Python3.6's async tools to make what I expect to be non-blocking async loops over each of the two streams, but that doesn't seem to solve the problem. The below code:

import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime


async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in p.stdout:
        print(datetime.now(), f.decode().strip())
    async for f in p.stderr:
        print(datetime.now(), "E:", f.decode().strip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run('''
         echo "Out 1";
         sleep 1;
         echo "Err 1" >&2;
         sleep 1;
         echo "Out 2"
    '''))
    loop.close()

outputs:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1

While I expect it to output something like:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2


Solution 1:[1]

To accomplish this, you need a function that will take two async sequences and merge them, producing the results from either one or the other, as they become available. With such a function in stock, run could look like this:

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in merge(p.stdout, p.stderr):
        print(datetime.now(), f.decode().strip())

A function like merge does not (yet) exist in the standard library, but the aiostream external library provides one. You can also write your own using an async generator and asyncio.wait():

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret

The above run will still differ from your desired output in one detail: it will not distinguish between output and error lines. But this can be easily accomplished by decorating the lines with an indicator:

async def decorate_with(it, prefix):
    async for item in it:
        yield prefix, item

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for is_out, line in merge(decorate_with(p.stdout, True),
                                    decorate_with(p.stderr, False)):
        if is_out:
            print(datetime.now(), line.decode().strip())
        else:
            print(datetime.now(), "E:", line.decode().strip())

Solution 2:[2]

It has occurred to me that there is actually a simpler solution to the problem, at least if the watching code is such that it doesn't need to be in a single coroutine invocation.

What you can do is spawn two separate coroutines, one for stdout and one for stderr. Running them in parallel will give you the needed semantics, and you can use gather to await their completion:

def watch(stream, prefix=''):
    async for line in stream:
        print(datetime.now(), prefix, line.decode().strip())

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    await asyncio.gather(watch(p.stdout), watch(p.stderr, 'E:'))

Solution 3:[3]

Here is an example that does noes not have any external dependencies:

def h_out(s):
    print(f"[O] {s}")

def h_err(s): 
    print(f"[E] {s}")

async def _rs(stream, cb, enc):  
    while True:
        line = await stream.readline()
        if line:
            line = line.decode(enc)
            cb(line.rstrip())
        else:
            break

cmd = ['tail', '-f', '/var/log/syslog']
enc = 'utf-8'
p = await asyncio.create_subprocess_exec(*cmd
                        , stdout=PIPE, stderr=PIPE)

await asyncio.wait([_rs(p.stdout, h_out, enc)
                    ,_rs(p.stderr, h_err, enc)])

await p.wait()

Full example of this code that works for Windows, Linux, BSD etc: github.com/JavaScriptDude/PyTail.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 user4815162342
Solution 3 Timothy C. Quinn