Как инициировать следующий запрос перед выходом в асинхронном генераторе в python

Я пытаюсь получить некоторые данные из API с разбивкой на страницы (в частности, github, но API не имеет значения для этого вопроса). Я использую асинхронный генератор Python для получения каждой отдельной строки с каждой страницы. Код выглядит примерно так:

async def get_data():
    cursor = None
    with aiohttp.ClientSession() as session:
        while True:
            async with session.get(build_url(cursor)):
                data = await response.json()

            yield from get_rows(data)

            if not has_next_page(data):
                return

            cursor = get_next_cursor(data)

Итак, это в принципе работает. Однако одним из незначительных недостатков является то, что он не инициирует следующий запрос до тех пор, пока не будут получены все строки с текущей страницы. Есть ли хороший способ инициировать эту обработку внутри этого цикла, прежде чем начать выход? В частности, я хочу убедиться, что async with по-прежнему правильно оценивается при выполнении asyncio.ensure_future, который является API для запуска фоновой работы.


person Lucretiel    schedule 26.06.2018    source источник
comment
Работает ли yield from в async def?   -  person user4815162342    schedule 27.06.2018
comment
Пока get_rows не асинхронный, да.   -  person Lucretiel    schedule 28.06.2018
comment
Какая это версия Python? В версии 3.6 я получаю синтаксическую ошибку: SyntaxError: 'yield from' inside async function   -  person user4815162342    schedule 28.06.2018


Ответы (1)


Для этого вам понадобится как минимум одна дополнительная сопрограмма, и соедините их с помощью asyncio.Queue:

async def get_data():
    queue = asyncio.Queue()

    async def fetch_all_pages():
        cursor = None
        with aiohttp.ClientSession() as session:
            while True:
                async with session.get(build_url(cursor)):
                    data = await response.json()

                await queue.put(data)

                if not has_next_page(data):
                    # signal the peer to exit
                    await queue.put(None)
                    break

                cursor = get_next_cursor(data)

    asyncio.ensure_future(fetch_all_pages())

    while True:
        data = await queue.get()
        if not data:
            break
        yield from get_rows(data)
person Fantix King    schedule 27.06.2018