Как предотвратить повышение asyncio.TimeoutError и продолжить цикл

Я использую aiohttp с методом limited_as_completed для ускорения очистки (около 100 миллионов статических страниц веб-сайта). Однако код останавливается через несколько минут и возвращает ошибку TimeoutError. Я пробовал несколько вещей, но все равно не смог предотвратить повышение asyncio.TimeoutError. Могу я спросить, как я могу игнорировать ошибку и продолжить?

Код, который я запускаю:

N=123
import html
from lxml import etree
import requests
import asyncio 
import aiohttp
from aiohttp import ClientSession, TCPConnector
import pandas as pd
import re 
import csv 
import time
from itertools import islice
import sys
from contextlib import suppress

start = time.time()
data = {}
data['name'] = []
filename = "C:\\Users\\xxxx"+ str(N) + ".csv"

def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

async def get_info_byid(i, url, session):
    async with session.get(url,timeout=20) as resp:
        print(url)
        with suppress(asyncio.TimeoutError):
            r = await resp.text()
            name = etree.HTML(r).xpath('//h2[starts-with(text(),"Customer Name")]/text()')
            data['name'].append(name)
            dataframe = pd.DataFrame(data)
            dataframe.to_csv(filename, index=False, sep='|')

limit = 1000
async def print_when_done(tasks):
    for res in limited_as_completed(tasks, limit):
        await res

url = "http://xxx.{}.html"
loop = asyncio.get_event_loop()

async def main():
    connector = TCPConnector(limit=10)
    async with ClientSession(connector=connector,headers=headers,raise_for_status=False) as session:
        coros = (get_info_byid(i, url.format(i), session) for i in range(N,N+1000000))
        await print_when_done(coros)

loop.run_until_complete(main())
loop.close()
print("took", time.time() - start, "seconds.")

Журнал ошибок:

Traceback (most recent call last):
  File "C:\Users\xxx.py", line 111, in <module>
    loop.run_until_complete(main())
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 573, in run_until_complete
    return future.result()
  File "C:\Users\xxx.py", line 109, in main
    await print_when_done(coros)
  File "C:\Users\xxx.py", line 98, in print_when_done
    await res
  File "C:\Users\xxx.py", line 60, in first_to_finish
    return f.result()
  File "C:\Users\xxx.py", line 65, in get_info_byid
    async with session.get(url,timeout=20) as resp:
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client.py", line 855, in __aenter__
    self._resp = await self._coro
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client.py", line 391, in _request
    await resp.start(conn)
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client_reqrep.py", line 770, in start
    self._continue = None
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\helpers.py", line 673, in __exit__
    raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError

Я пробовал 1) добавить ожидаемый asyncio.TimeoutError: pass. Не работает

async def get_info_byid(i, url, session):
    async with session.get(url,timeout=20) as resp:
        print(url)
        try:
            r = await resp.text()
            name = etree.HTML(r).xpath('//h2[starts-with(text(),"Customer Name")]/text()')
            data['name'].append(name)
            dataframe = pd.DataFrame(data)
            dataframe.to_csv(filename, index=False, sep='|')
        except asyncio.TimeoutError:
            pass

2) подавить (asyncio.TimeoutError), как показано выше. Не работает

Я только вчера изучил aiohttp, так что, может быть, в моем коде есть другие ошибки, которые вызывают ошибку тайм-аута только после нескольких минут работы? Большое спасибо, если кто знает, как с этим бороться!


person SiriusBlack    schedule 29.10.2018    source источник
comment
Попробуйте обернуть async with session.get(url,timeout=20) as resp: в try except.   -  person Yurii Kramarenko    schedule 30.10.2018
comment
@YuriiKramarenko Большое спасибо! Это сработало для некоторых ошибок, но через некоторое время у asyncio появляются другие новые ошибки, такие как проблемы с памятью. Возможно, мне придется либо вернуться к запросам, либо использовать пакет для вызова py для небольших диапазонов, чтобы я мог передавать память или случайные ошибки.   -  person SiriusBlack    schedule 08.11.2018
comment
Я думаю, что лучшее решение - это написать простую оболочку для session.get(), которая позволит вам делать запросы без контекстного менеджера и использовать его с сбором.   -  person Yurii Kramarenko    schedule 08.11.2018
comment
Добавить небольшой пример к ответам   -  person Yurii Kramarenko    schedule 08.11.2018


Ответы (4)


Простой пример (не очень хорошо, но работает нормально):

import asyncio
from aiohttp.client import ClientSession


class Wrapper:

    def __init__(self, session):
        self._session = session

    async def get(self, url):
        try:
            async with self._session.get(url, timeout=20) as resp:
                return await resp.text()
        except Exception as e:
            print(e)


loop = asyncio.get_event_loop()
wrapper = Wrapper(ClientSession())

responses = loop.run_until_complete(
    asyncio.gather(
        wrapper.get('http://google.com'),
        wrapper.get('http://google.com'),
        wrapper.get('http://google.com'),
        wrapper.get('http://google.com'),
        wrapper.get('http://google.com')
    )
)

print(responses)
person Yurii Kramarenko    schedule 08.11.2018
comment
Еще раз спасибо за вашу помощь. Мне потребовалось некоторое время, чтобы понять, как включить обертку в мою структуру. (Возможно, я ошибаюсь, но) Кажется, оболочка просит python записывать в файл csv только после того, как будет сделана сборка данных. Однако время написания здесь не является ограничением. Кажется, что сервер веб-сайта может обрабатывать только ограниченные запросы каждую минуту, и если я разрешаю более 2 запросов (лимиты > 2) одновременно, часто будет появляться ошибка 503. - person SiriusBlack; 18.11.2018

то, что сделал @Yurii Kramarenko, наверняка вызовет исключение незакрытого сеанса клиента, поскольку сеанс никогда не закрывался должным образом. Я рекомендую что-то вроде этого:

import asyncio
import aiohttp

async def main(urls):
    async with aiohttp.ClientSession(timeout=self.timeout) as session:
        tasks=[self.do_something(session,url) for url in urls]
        await asyncio.gather(*tasks)
person jbxiaoyu    schedule 29.02.2020

Мне нравится ответ @jbxiaoyu, но тайм-аут kwarg, похоже, принимает специальный объект, поэтому я подумал, что добавлю, что вам нужно создать объект ClientTimeout, а затем передать его в сеанс, например так:

from aiohttp import ClientSession, ClientTimeout
timeout = ClientTimeout(total=600)
async with ClientSession(timeout=timeout) as session:
    tasks=[self.do_something(session,url) for url in urls]
    await asyncio.gather(*tasks)
person Anthony Roberts    schedule 23.06.2020

Когда я получил эту ошибку, я понял, что не подключен к vpn. Поэтому я предлагаю вам проверить, где вы делаете запрос.

person Rifat Dinc    schedule 24.05.2021