Если вы не хотите использовать другую библиотеку, вы можете запланировать сопрограмму из потока. Замена queue.put_nowait
следующим работает нормально.
asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)
Переменная loop
представляет цикл событий в основном потоке.
РЕДАКТИРОВАТЬ:
Причина, по которой ваша async
сопрограмма ничего не делает, заключается в том, что цикл событий никогда не дает ей возможности сделать это. Объект очереди не является потокобезопасным, и если вы покопаетесь в коде cpython, вы обнаружите, что это означает, что put_nowait
будит потребителей очереди за счет использования future с методом call_soon
цикла событий. Если бы мы могли заставить его использовать call_soon_threadsafe
, он должен работать. Однако основное различие между call_soon
и call_soon_threadsafe
заключается в том, что call_soon_threadsafe
пробуждает цикл событий, вызывая _ 11_. Так что назовем это сами:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
queue._loop._write_to_self()
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
Затем все работает как положено.
Что касается аспекта безопасности потоков доступа к общим объектам, asyncio.queue
использует под капотом collections.deque
, который имеет потокобезопасные append
и popleft
. Возможно, проверка того, что очередь не пуста, и popleft не является атомарным, но если вы используете очередь только в одном потоке (одном из цикла событий), все может быть в порядке.
Другие предлагаемые решения, loop.call_soon_threadsafe
из ответа Хуазуо Гао и мой asyncio.run_coroutine_threadsafe
просто делают это, пробуждая цикл событий.
person
cronos
schedule
07.04.2017
"I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device"
- Пробовали ли вы использоватьloop.run_in_executor
для блокирования взаимодействия с последовательным устройством? - person Jashandeep Sohi   schedule 02.10.2015