Есть ли способ использовать asyncio.Queue в нескольких потоках?

Предположим, у меня есть следующий код:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

Проблема с этим кодом в том, что цикл внутри async сопрограммы никогда не завершает первую итерацию, а размер queue увеличивается.

Почему так происходит и что я могу сделать, чтобы это исправить?

Я не могу избавиться от отдельного потока, потому что в моем реальном коде я использую отдельный поток для связи с последовательным устройством, и я не нашел способа сделать это с помощью asyncio.


person Aleksandr Kovalev    schedule 01.10.2015    source источник
comment
"I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device" - Пробовали ли вы использовать loop.run_in_executor для блокирования взаимодействия с последовательным устройством?   -  person Jashandeep Sohi    schedule 02.10.2015


Ответы (4)


asyncio.Queue не поддерживает потоки, поэтому вы можете Не используйте его напрямую из более чем одного потока. Вместо этого вы можете использовать janus, стороннюю библиотеку, которая предоставляет asyncio очередь:

import asyncio
import threading
import janus

def threaded(squeue):
    import time
    while True:
        time.sleep(2)
        squeue.put_nowait(time.time())
        print(squeue.qsize())

@asyncio.coroutine
def async(aqueue):
    while True:
        time = yield from aqueue.get()
        print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()

Также существует aioprocessing (полное раскрытие: я написал это), который обеспечивает безопасность процесса (и как побочный эффект, потокобезопасные) очереди, но это излишне, если вы не пытаетесь использовать multiprocessing.

Изменить

Как указано в других ответах, для простых случаев использования вы можете использовать _ 7_, чтобы также добавить его в очередь.

person dano    schedule 01.10.2015
comment
NameError: name 'async' is not defined дан в asyncio.Task(async(queue.async_q)). Что мне делать? - person Stam Kaly; 22.12.2016
comment
@StamKaly Извините, используйте asyncio.async или даже лучше asyncio.ensure_future, потому что asyncio.async устарел. - person dano; 22.12.2016

BaseEventLoop.call_soon_threadsafe уже под рукой. Подробнее см. asyncio doc.

Просто измените свой threaded() следующим образом:

def threaded():
    import time
    while True:
        time.sleep(1)
        loop.call_soon_threadsafe(queue.put_nowait, time.time())
        loop.call_soon_threadsafe(lambda: print(queue.qsize()))

Вот пример вывода:

0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943
person Huazuo Gao    schedule 03.10.2015

Если вы не хотите использовать другую библиотеку, вы можете запланировать сопрограмму из потока. Замена queue.put_nowait следующим работает нормально.

asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)

Переменная loop представляет цикл событий в основном потоке.

РЕДАКТИРОВАТЬ:

Причина, по которой ваша async сопрограмма ничего не делает, заключается в том, что цикл событий никогда не дает ей возможности сделать это. Объект очереди не является потокобезопасным, и если вы покопаетесь в коде cpython, вы обнаружите, что это означает, что put_nowait будит потребителей очереди за счет использования future с методом call_soon цикла событий. Если бы мы могли заставить его использовать call_soon_threadsafe, он должен работать. Однако основное различие между call_soon и call_soon_threadsafe заключается в том, что call_soon_threadsafe пробуждает цикл событий, вызывая _ 11_. Так что назовем это сами:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        queue._loop._write_to_self()
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

Затем все работает как положено.

Что касается аспекта безопасности потоков доступа к общим объектам, asyncio.queue использует под капотом collections.deque, который имеет потокобезопасные append и popleft. Возможно, проверка того, что очередь не пуста, и popleft не является атомарным, но если вы используете очередь только в одном потоке (одном из цикла событий), все может быть в порядке.

Другие предлагаемые решения, loop.call_soon_threadsafe из ответа Хуазуо Гао и мой asyncio.run_coroutine_threadsafe просто делают это, пробуждая цикл событий.

person cronos    schedule 07.04.2017
comment
Это сработало для меня правильно. Я тестировал, и он обменивается сообщениями между потоком и курстиной. - person eduardosufan; 24.07.2020

А как насчет простого использования threading.Lock с помощью asyncio.Queue?

class ThreadSafeAsyncFuture(asyncio.Future):
    """ asyncio.Future is not thread-safe
    https://stackoverflow.com/questions/33000200/asyncio-wait-for-event-from-other-thread
    """
    def set_result(self, result):
        func = super().set_result
        call = lambda: func(result)
        self._loop.call_soon_threadsafe(call)  # Warning: self._loop is undocumented


class ThreadSafeAsyncQueue(queue.Queue):
    """ asyncio.Queue is not thread-safe, threading.Queue is not awaitable
    works only with one putter to unlimited-size queue and with several getters
    TODO: add maxsize limits
    TODO: make put corouitine
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.lock = threading.Lock()
        self.loop = asyncio.get_event_loop()
        self.waiters = []

    def put(self, item):
        with self.lock:
            if self.waiters:
                self.waiters.pop(0).set_result(item)
            else:
                super().put(item)

    async def get(self):
        with self.lock:
            if not self.empty():
                return super().get()
            else:
                fut = ThreadSafeAsyncFuture()
                self.waiters.append(fut)
        result = await fut
        return result

См. Также - asyncio: ждать события из другого потока

person vladimirfol    schedule 03.09.2018
comment
Мне нравится идея, у меня аналогичная ситуация, когда я хочу, чтобы блокировка была помещена из другого потока, а не из цикла событий asyncio. Имейте в виду, что ваш фрагмент кода неправильно обрабатывает job_complete. если ожидающая сопрограмма вызывает task_done(), очередь может возрасти, потому что она никогда не была уведомлена об элементе, находящемся в очереди изначально (он был непосредственно помещен в будущее). - person Marti Nito; 29.11.2019