Можно ли использовать подпроцесс asyncio с contextmanager?

В python (3.7+) я пытаюсь запустить подпроцесс в качестве менеджера контекста, асинхронно передавая потенциально большие объемы stdout. Проблема в том, что я не могу заставить тело contextmanager работать асинхронно с обратным вызовом stdout. Я пытался использовать потоки, запуская там асинхронную функцию, но тогда я не мог понять, как вернуть объект Process обратно в менеджер контекста.

Итак, вопрос: как я могу получить объект асинхронного процесса из менеджера контекста в основном потоке во время его работы? То есть я хотел бы получить уже и в настоящее время запущенный процесс из open_subprocess() до того, как он будет завершен в следующем коде.

import asyncio
import contextlib

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

async def stream_subprocess(cmd, *args, stdout_callback=print):
    proc = await asyncio.create_subprocess_exec(
        cmd,
        *args,
        stdout=asyncio.subprocess.PIPE)
    read = read_stream(proc, proc.stdout, stdout_callback)
    await asyncio.wait([read])
    return proc

@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
    proc_coroutine = stream_subprocess(
        cmd,
        *args,
        stdout_callback=stdout_callback)
    # The following blocks until proc has finished
    # I would like to yield proc while it is running
    proc = asyncio.run(proc_coroutine)
    yield proc
    proc.terminate()

if __name__ == '__main__':
    import time

    def stdout_callback(data):
        print('STDOUT:', data)

    with open_subprocess('ping', '-c', '4', 'localhost',
                         stdout_callback=stdout_callback) as proc:
        # The following code only runs after proc completes
        # but I would expect these print statements to
        # be interleaved with the output from the subprocess
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc.pid}...')
            time.sleep(1)

    print(f'RETURN CODE: {proc.returncode}')

person Johann    schedule 01.08.2019    source источник


Ответы (2)


Asyncio обеспечивает параллельное выполнение за счет приостановки всего, что может заблокировать. Чтобы это работало, весь код должен быть внутри обратных вызовов или сопрограмм и воздержитесь от вызова блокирующих функций, таких как time.sleep(). Помимо этого, в вашем коде есть некоторые другие проблемы, например, await asyncio.wait([x]) эквивалентно await x, а это означает, что open_subprocess не будет выполняться до тех пор, пока не будет выполнено все чтение потока.

Правильный способ структурировать код — переместить код верхнего уровня в async def и использовать диспетчер асинхронного контекста. Например:

import asyncio
import contextlib

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
    proc = await asyncio.create_subprocess_exec(
        cmd, *args, stdout=asyncio.subprocess.PIPE)
    asyncio.create_task(read_stream(proc, proc.stdout, stdout_callback))
    yield proc
    if proc.returncode is None:
        proc.terminate()
        await proc.wait()

async def main():
    def stdout_callback(data):
        print('STDOUT:', data)

    async with open_subprocess('ping', '-c', '4', 'localhost',
                               stdout_callback=stdout_callback) as proc:
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc.pid}...')
            await asyncio.sleep(1)

    print(f'RETURN CODE: {proc.returncode}')

asyncio.run(main())

Если вы настаиваете на смешивании синхронизирующего и асинхронного кода, вам необходимо полностью разделить их, запустив цикл обработки событий asyncio в отдельном потоке. Тогда ваш основной поток не сможет напрямую обращаться к асинхронным объектам, таким как proc, потому что они не являются потокобезопасными. Вам нужно последовательно использовать call_soon_threadsafe и run_coroutine_threadsafe для связи с циклом событий.

Этот подход сложен и требует взаимодействия между потоками и работы с циклами событий, поэтому я бы не рекомендовал его, кроме как в качестве учебного упражнения. Не говоря уже о том, что если вы используете другой поток, вам вообще не нужно возиться с asyncio — вы можете напрямую выполнять вызовы синхронизации в другом потоке. Но, сказав это, вот возможная реализация:

import asyncio
import contextlib
import concurrent.futures
import threading

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

async def stream_subprocess(cmd, *args, proc_data_future, stdout_callback=print):
    try:
        proc = await asyncio.create_subprocess_exec(
            cmd, *args, stdout=asyncio.subprocess.PIPE)
    except Exception as e:
        proc_data_future.set_exception(e)
        raise
    proc_data_future.set_result({'proc': proc, 'pid': proc.pid})
    await read_stream(proc, proc.stdout, stdout_callback)
    return proc

@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
    loop = asyncio.new_event_loop()
    # needed to use asyncio.subprocess outside the main thread
    asyncio.get_child_watcher().attach_loop(loop)
    threading.Thread(target=loop.run_forever).start()
    proc_data_future = concurrent.futures.Future()
    loop.call_soon_threadsafe(
        loop.create_task,
        stream_subprocess(cmd, *args,
                          proc_data_future=proc_data_future,
                          stdout_callback=stdout_callback))
    proc_data = proc_data_future.result()
    yield proc_data
    async def terminate(proc):
        if proc.returncode is None:
            proc.terminate()
            await proc.wait()
    asyncio.run_coroutine_threadsafe(terminate(proc_data['proc']), loop).result()
    proc_data['returncode'] = proc_data['proc'].returncode
    loop.call_soon_threadsafe(loop.stop)

if __name__ == '__main__':
    import time

    def stdout_callback(data):
        print('STDOUT:', data)

    with open_subprocess('ping', '-c', '4', 'localhost',
                         stdout_callback=stdout_callback) as proc_data:
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc_data["pid"]}...')
            time.sleep(1)

    print(f'RETURN CODE: {proc_data["returncode"]}')
person user4815162342    schedule 02.08.2019
comment
Ключевой вывод, который дает этот ответ, заключается в том, что нет необходимости ждать созданных задач. Я думал, что должен явно ждать для всех задач, которые должны выполняться в цикле событий, в частности, для задачи, созданной из сопрограммы read_stream. Я до сих пор не уверен, почему это так. - person Johann; 02.08.2019
comment
@Johann Ваша интуиция не совсем неуместна - это действительно хорошая идея в конечном итоге дождаться порожденной задачи, иначе необработанные исключения, вызванные задачами, будут потеряны. Но определенно верно, что задачу не нужно ждать сразу — в этом случае может быть хорошей идеей ждать задачу, созданную из read_stream() после yield. См. эту статью. более длинный трактат на эту тему. - person user4815162342; 02.08.2019

Способ работы с @contextlib.asynccontextmanager и подпрограммой Process.wait() (ожидает дочерний процесс для завершения, устанавливает и возвращает атрибут returncode):

import asyncio
import contextlib

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if not data:
            break
        callback(data.decode().rstrip())


async def stream_subprocess(cmd, *args, stdout_callback=print):
    proc = await asyncio.create_subprocess_exec(cmd, *args,
                                                stdout=asyncio.subprocess.PIPE)
    await read_stream(proc, proc.stdout, stdout_callback)
    return proc


@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
    try:
        proc = await stream_subprocess(cmd, *args, stdout_callback=stdout_callback)
        yield proc
    finally:
        await proc.wait()

if __name__ == '__main__':
    import time

    def stdout_callback(data):
        print('STDOUT:', data)


    async def main():
        async with open_subprocess('ping', '-c', '4', 'localhost',
                                   stdout_callback=stdout_callback) as proc:
            # The following code only runs after proc completes
            for i in range(2):
                print(f'RUNNING SUBPROCESS {proc.pid}...')
                time.sleep(1)

        print(f'RETURN CODE: {proc.returncode}')

    asyncio.run(main())

Пример рабочего вывода:

STDOUT: PING localhost (127.0.0.1): 56 data bytes
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.048 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=1 ttl=64 time=0.074 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=2 ttl=64 time=0.061 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=3 ttl=64 time=0.067 ms
STDOUT: 
STDOUT: --- localhost ping statistics ---
STDOUT: 4 packets transmitted, 4 packets received, 0.0% packet loss
STDOUT: round-trip min/avg/max/stddev = 0.048/0.062/0.074/0.010 ms
RUNNING SUBPROCESS 35439...
RUNNING SUBPROCESS 35439...
RETURN CODE: 0

Process finished with exit code 0
person RomanPerekhrest    schedule 01.08.2019
comment
Я полагаю, я был неясен. Я ожидаю, что строки RUNNING SUBPROCESS будут чередоваться со строками STDOUT. Здесь тело контекста не вводится до завершения процесса. - person Johann; 01.08.2019
comment
@Johanne, ты не упомянул о чередовании, у тебя не было менеджера асинхронного контекста - теперь он у тебя есть. Чтобы получить сложное межпроцессное взаимодействие, вам необходимо разработать все правила и представить ожидаемый результат. - person RomanPerekhrest; 01.08.2019
comment
Я думал, что ясно выразился в утверждении (комментарий в коде), я хотел бы получить proc во время его работы. В моем примере кода и в вашем контекст уступает только после завершения процесса. Обратите внимание, что оба наших примера дают одинаковый результат. Я скопирую этот комментарий в прозу вопроса. - person Johann; 01.08.2019