Случайные ошибки соединения с aio_pika после 2-х дней работы

У меня есть скрипт asyncio, который подключается к rabbitmq с библиотекой aio_pika каждые 40 секунд, проверяет, есть ли какие-либо сообщения, и распечатывает их, что затем повторяется вечно. Однако, как правило, примерно через 2 дня работы я начинаю получать бесконечные ошибки исключения соединения, которые можно решить только путем перезапуска скрипта. Возможно, в логике моего асинхронного скрипта есть очевидные ошибки, которые я упускаю?

#!/usr/bin/python3
import time
import async_timeout
import asyncio
import aio_pika

async def got_message(message: aio_pika.IncomingMessage):
    with message.process():
        print(message.body.decode())

async def main(loop):
    try:
        with async_timeout.timeout(10):
            connection = await aio_pika.connect_robust(
                host='#', 
                virtualhost='#', 
                login='#', 
                password='#',
                port=5671,
                loop=loop, 
                ssl=True
            )

            channel = await connection.channel()

            await channel.set_qos(prefetch_count=100)

            queue_name='mm_message'
            queue = await channel.declare_queue(auto_delete=False, name=queue_name)

            routing_key='mm_msg'
            await queue.bind("amq.topic", routing_key)
            que_len = queue.declaration_result.message_count
            if(que_len > 0):
                await queue.consume(got_message)
    except:
        print("connection problems..")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    while(True):
        time.sleep(40)
        loop.run_until_complete(main(loop))

Это ошибка, которую я бесконечно получаю через некоторое время:

Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/events.py", line 125, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/adapters/base_connection.py", line 364, in _handle_events
    self._handle_read()
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/adapters/base_connection.py", line 415, in _handle_read
    self._on_data_available(data)
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/connection.py", line 1347, in _on_data_available
    self._process_frame(frame_value)
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/connection.py", line 1414, in _process_frame
    if self._process_callbacks(frame_value):
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/connection.py", line 1384, in _process_callbacks
    frame_value)  # Args
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/callback.py", line 60, in wrapper
    return function(*tuple(args), **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/callback.py", line 92, in wrapper
    return function(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/callback.py", line 236, in process
    callback(*args, **keywords)
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/connection.py", line 1332, in _on_connection_tune
    self._send_connection_open()
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/connection.py", line 1517, in _send_connection_open
    self._on_connection_open, [spec.Connection.OpenOk])
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/connection.py", line 1501, in _rpc
    self._send_method(channel_number, method_frame)
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/connection.py", line 1569, in _send_method
    self._send_frame(frame.Method(channel_number, method_frame))
  File "/usr/local/lib/python3.5/dist-packages/aio_pika/pika/connection.py", line 1548, in _send_frame
    raise exceptions.ConnectionClosed
aio_pika.pika.exceptions.ConnectionClosed

person A. Smith    schedule 01.04.2019    source источник


Ответы (1)


except:
    print("connection problems..")

Это перехватит исключения службы, такие как KeyboardInterrupt, SystemExit. Вам следует никогда не делать такого, если вы не собираетесь повторно вызывать исключение. Как минимум вы должны написать:

except Exception:
    print("connection problems..")

Однако в контексте фрагмента asyncio выше будет нарушен механизм отмены . Чтобы избежать этого, как описано здесь, вы должны написать:

try:
    await operation
except asyncio.CancelledError:
    raise
except Exception:
    log.log('an error has occurred')

Не менее важно понимать, что соединение должно не только открываться, но и закрываться (вне зависимости от того, что произошло между открытием и закрытием). Для этого обычно используют менеджеры контекста (а в asyncio - асинхронные менеджеры контекста).

aio_pika вроде бы не исключение. Как показывает example, вы должны использовать async with при работе с соединением:

connection = await aio_pika.connect_robust(
    "amqp://guest:[email protected]/", loop=loop
)

async with connection:
    # ...
person Mikhail Gerasimov    schedule 01.04.2019
comment
Следует ли пытаться... кроме использования с каждой операцией ожидания? Спасибо за ваш вклад, это действительно полезно. - person A. Smith; 01.04.2019
comment
@ А.Смит нет, не должно. Я говорю следующее: любой ожидаемый объект потенциально может вызвать CancelledError, и вы не должны препятствовать его распространению, если только вы не отменили этот ожидаемый объект явным образом. - person Mikhail Gerasimov; 01.04.2019