Apache Airflow Celery Redis DecodeError

Использование последней версии apache airflow. Начал с LocalExecutor, в этом режиме все работало нормально, за исключением некоторых взаимодействий, в веб-интерфейсе указано, что для их использования необходим CeleryExecutor. Установил и настроил исполнитель Celery с Redis, настроил Redis в качестве URL-адреса брокера и серверной части результатов.

Сначала кажется, что он работает, пока задача не будет запланирована, и в этот момент она выдает следующую ошибку:

 File "/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler
    job.run()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run
    self._execute()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute
    self._execute_helper(processor_manager)
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper
    self.executor.heartbeat()
  File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat
    self.sync()
  File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync
    state = async.state
  File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state
    return self._get_task_meta()['status']
  File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta
    meta = self._get_task_meta_for(task_id)
  File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for
    return self.decode_result(meta)
  File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result
    return self.meta_from_decoded(self.decode(payload))
  File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode
    accept=self.accept)
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
    return decode(data)
  File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
    return decode(data)
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads
    return load(BytesIO(s))
kombu.exceptions.DecodeError: invalid load key, '{'.

Кажется, это ошибка сериализации рассола, но я не уверен, как отследить причину. Какие-либо предложения?

Эта проблема постоянно влияет на рабочий процесс, в котором я использую функцию subdag, возможно, проблема связана с этим.

ПРИМЕЧАНИЕ. Я также тестировал с помощью rabbitMQ, там была другая проблема; клиент показывает «соединение сброшено узлом» и вылетает. Журнал RabbitMQ показывает, что «клиент неожиданно закрыл TCP-соединение».


person James Suffolk    schedule 19.09.2017    source источник


Ответы (3)


Я наткнулся на это, увидев точно такую ​​же обратную трассировку в журналах нашего планировщика:

  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads
    return load(BytesIO(s))
kombu.exceptions.DecodeError: invalid load key, '{'.

Тот факт, что celery пытался распаковать что-то, начинающееся с '{', показался мне подозрительным, поэтому я взял tcpdump трафика и запустил задачу через веб-интерфейс. Полученный захват включал этот обмен почти в тот же самый момент, когда указанная выше обратная трассировка появилась в журналах планировщика:

05:03:49.145849 IP <scheduler-ip-addr>.ec2.internal.45597 > <redis-ip-addr>.ec2.internal.6379: Flags [P.], seq 658:731, ack 46, win 211, options [nop,nop,TS val 654768546 ecr 4219564282], length 73: RESP "GET" "celery-task-meta-b0d3a29e-ac08-4e77-871e-b4d553502cc2"
05:03:49.146086 IP <redis-ip-addr>.ec2.internal.6379 > <scheduler-ip-addr>.ec2.internal.45597: Flags [P.], seq 46:177, ack 731, win 210, options [nop,nop,TS val 4219564282 ecr 654768546], length 131: RESP "{"status": "SUCCESS", "traceback": null, "result": null, "task_id": "b0d3a29e-ac08-4e77-871e-b4d553502cc2", "children": []}"

Полезная нагрузка ответа от Redis явно JSON, так почему же сельдерей пытается его распаковать? Мы находимся в процессе перехода с Airflow 1.7 на 1.8, и во время развертывания у нас есть один парк рабочих Airflow, работающих под управлением версии 1.7, и другой, работающий под управлением версии 1.8. Рабочие должны были извлекать данные из очередей с непересекающимися рабочими нагрузками, но из-за ошибки в одной из наших DAG у нас был экземпляр TaskInstance, запланированный Airflow 1.8, который затем выполнялся рабочим celery, запущенным через Airflow 1.7.

AIRFLOW-1038 изменил сериализатор для статусов задач celery из JSON (по умолчанию). для рассола, поэтому рабочие, выполняющие версию кода до этого изменения, будут сериализовать результаты в JSON, а планировщики, выполняющие версию кода, включающую это изменение, попытаются десериализовать результаты путем распаковки, что вызывает указанную выше ошибку.

person tronbabylove    schedule 09.10.2017

Пожалуйста, проверьте, какой тип celery_result_backend вы настроили в airflow.cfg. Попробуйте переключить его на серверную часть базы данных (mysql и т. д.), если это не так.

Мы видим, что с бэкендом ampq (доступно только на Celery 3.1 и ниже), бэкендом redis и rpc иногда возникают проблемы.

person Bolke de Bruin    schedule 06.11.2017

Мы столкнулись с этим, наше управление конфигурацией не было должным образом обновлено, поэтому планировщик и некоторые рабочие процессы работали на apache-airflow 1.8.2, в то время как значительная часть рабочих выполняла airflow 1.8.0. Убедитесь, что на всех ваших узлах установлена ​​одна и та же версия воздушного потока.

person Dowowow    schedule 23.08.2019