Является ли это предполагаемым шаблоном для использования сопрограмм Python `asyncio`?

Я пытаюсь написать небольшой обработчик параллельного потока с помощью Slack RTM API, и мне интересно, является ли это наиболее эффективным использованием сопрограмм Python. Пакет asyncio имеет массу опций, но трудно определить, какой подход является правильным для проекта, и документация, я думаю, не очень хорошо объясняет, в чем преимущества/недостатки каждого из них.

Я думаю, что мне не нужны накладные расходы на несколько потоков, и мне нужна взаимосвязь между асинхронными циклами. Должен ли я создать отдельный BaseEventLoop для каждой из моих функций?

Будучи Python, я думаю, что здесь есть почти-детерминированный ответ на этот вопрос (There should be one-- and preferably only one --obvious way to do it), но я боюсь, что добавление всего этого асинхронного хлама может просто сделать мой код менее производительным, чем полностью последовательная наивная реализация .

# Is this the best way to communicate between coroutines? 
incoming_message_q = asyncio.Queue()

async def print_event():
    logging.info("Entering log loop")

    # Should this operate within it's own BaseEventLoop? 
    while True:
        event = await incoming_message_q.get()
        logging.info(event)

async def log_queue_status():
    while True:
        logging.info(incoming_message_q.qsize())
        await asyncio.sleep(5)

async def read_rtm_connection(client, q):
    if client.rtm_connect():
        logging.info("Successful Slack RTM connection")
        while True:

            # How do I make this part non-blocking?
            events = client.rtm_read()

            for event in events:
                logging.info("Putting onto the queue", event)
                if event["type"] == "presence_change":
                    await q.put(event)
                elif event["type"] == "message":
                    await q.put(event)
                else:
                    logging.info("Not sure what to do")

            await asyncio.sleep(0.1)
    else:
        logging.info("RTM connection failed.")

loop = asyncio.get_event_loop()
loop.create_task(print_event())
loop.create_task(log_queue_status())
loop.create_task(read_rtm_connection(client, incoming_message_q))
loop.run_forever()

person dalanmiller    schedule 27.01.2016    source источник
comment
Я не знаком со Slack или с тем, что такое метод rtm_read(), так что имейте в виду: чтобы сделать что-то неблокирующим, это должна быть ожидаемая сопрограмма, которую вы можете запустить как асинхронную задачу с помощью sure_future(). Если rtm_read() не является сопрограммой, вы можете попробовать обернуть ее в сопрограмму, которая каким-то образом опрашивает rtm_read(), чтобы получать новые события и передавать их другим вашим методам. В зависимости от вашего случая вы можете передавать сообщения о событиях прямо в очередь или иным образом возвращать будущее каждый раз, когда получено событие, и использовать цикл for-in внутри сопрограммы для их итерации.   -  person songololo    schedule 28.01.2016


Ответы (1)


Если вы хотите взаимодействовать со slack асинхронным способом, вам понадобится неблокирующий API. Я не уверен, что вы сейчас используете, но если он не включает никаких сопрограмм asyncio, его, вероятно, будет нелегко интегрировать в asyncio, если только вы не запустите все блокирующие вызовы в фоновом потоке через loop.run_in_executor. Другим вариантом было бы фактически преобразовать все базовые блокирующие вызовы ввода-вывода в библиотеке в неблокирующие, что, как правило, представляет собой целую кучу работы.

Хорошая новость заключается в том, что есть по крайней мере одна библиотека, которая уже сделала эту работу за вас; slacker-asyncio, который является ответвлением slacker. Вы должны иметь возможность использовать это для взаимодействия с RTM API через сопрограммы.

person dano    schedule 28.01.2016
comment
Спасибо за информацию @dano. Я просматривал библиотеку slackhq/python-slackclient вчера после того, как опубликовал это, и увидел, что она действительно очень блокирует. Если бы я выполнил эту часть кода в loop.run_in_executor, как вы говорите, какой был бы безопасный способ связи между сопрограммами и потоком slackclient? - person dalanmiller; 28.01.2016
comment
@dalanmiller Если вы просто выполняете методы для объекта client, как в приведенном выше примере, вы можете просто выполнить events = loop.run_in_executor(None, client.rtm_read). asyncio автоматически вернет результат вызова client.rtm_read(). - person dano; 29.01.2016
comment
Если у меня есть другие run_in_executor, будет ли безопасно для них также получать доступ к очередям? AKA с run_in_executor Мне нужно беспокоиться о потокобезопасности, да? - person dalanmiller; 29.01.2016
comment
@dalanmiller В вашем примере кода ничто не обращается к очереди из-за пределов потока цикла событий, поэтому вы можете продолжать использовать asyncio.Queue. Если у вас есть код, работающий в других потоках, которым требуется доступ к очереди, вам придется использовать либо threading.Queue через run_in_executor, либо реализацию очереди библиотеки janus (как обсуждалось здесь). - person dano; 29.01.2016
comment
@dalanmiller Кроме того, код в моем предыдущем комментарии должен был читаться как event = await loop.run_in_executor(None, client.rtm_read). - person dano; 29.01.2016
comment
Еще один вопрос @dano, если вы не возражаете. Было бы более или менее эффективно запускать весь конвейер обработки в одной функции async? Или разделить его на отдельные петли, как я сделал здесь? Или трудно сказать? - person dalanmiller; 29.01.2016
comment
@dalanmiller Ну, прямо сейчас все ваши другие циклы — это print/log; они не делают ничего асинхронного. Таким образом, отделение их от основной функции не даст выигрыша в производительности. Накладные расходы на отправку данных через очередь невелики, но, очевидно, выше, чем просто регистрация непосредственно в read_rtm_connection. Я бы сказал, что очередь действительно полезна только в том случае, если вам нужно выполнить ввод-вывод, который использует await. - person dano; 31.01.2016