Schedule Jobs 2: Запланируйте повторяющиеся задания/задачи с помощью Celery.
В последнем уроке [1] я продемонстрировал, как планировать задания с помощью Crontab. В этом уроке я продолжаю демонстрировать, как планировать повторяющиеся задания/задачи с помощью Celery.
Обзор
Celery — это популярная и мощная (с открытым исходным кодом) асинхронная очередь задач/работ, основанная на распределенной передаче сообщений. Он поддерживает как планирование, так и постановку задач/работ в очередь. В этом руководстве основное внимание уделяется планированию повторяющихся задач/работ. На приведенной ниже диаграмме показана архитектура типичного приложения для планирования повторяющихся задач/работ.
Вообще говоря, как Очередь задач, Celery состоит из
- производитель, который определяет задачи и отвечает за отправку сообщений о задачах брокеру(ам);
- несколько рабочих процессов, которые получают сообщения о задачах от брокера, выполняют задачи и сохраняют результаты в серверной части результатов (базе данных)
Сам Celery не включает в себя брокера и (результат) бэкэнд. Следовательно, самой очереди задач нужен брокер сообщений и (результат) серверная часть. Celery поддерживает различные мессенджеры брокеры, такие как Redis, RabbitMQ, Amazon SQS и т. д., а также результат бэкенды , такие как AMQP, Redis, Memcached, SQLAlchemy (PostgreSQL, MySQL, SQLite) и т. д.
В качестве Планировщика заданий нам нужен еще один компонент, celery beat, который помещает задачи в очередь сообщений через равные промежутки времени, чтобы, как только поскольку любой рабочий доступен, задачи подбираются и выполняются. Celery beat поддерживает четыре различных способа определения повторяющейся задачи.
- регулярный (временной) интервал повторяющейся задачи: например. проверка состояния датчика раз в 10 секунд
- расписание crontab: например. создание отчета о продажах и отправка его всем заинтересованным сторонам по электронной почте каждый день после полуночи
- солнечное расписание: например. ежедневная запись информации о погоде на закате в определенном географическом месте
- настраиваемое расписание: если планировщик по умолчанию не подходит для вашего варианта использования, например, вы хотите хранить свои расписания и информацию о состоянии в определенной базе данных SQL или NoSQL по вашему выбору вместо локального файла базы данных
shelve
, вам нужно определить свой собственный (пользовательский) планировщик, создав подклассы какcelery.beat.Scheduler
, так иcelery.beat.ScheduleEntry
В этом руководстве мы сосредоточимся на планировщике по умолчанию, celery.beat.PersistentScheduler
, и покажем, как создать планировщик задач, используя Redis в качестве брокера сообщений и PostgreSQL в качестве серверной части.
Шаг 1: Подготовка брокера и серверной части
Сначала мы запускаем сервер Redis и сервер PostgreSQL, используя контейнеры докеров соответственно.
1.1 Подготовка бэкенда результатов
demo@localhost ~ % docker run -d --name demo_backend -p 5432:5432 -e POSTGRES_PASSWORD=dbc postgres:latest 0b205dd31d60ec6c8235219dfdb4088b7f3cf93ace9a40c068562381b49dc9d7 demo@localhost ~ % docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 0b205dd31d60 postgres:latest "docker-entrypoint.s…" 5 seconds ago Up 3 seconds 0.0.0.0:5432->5432/tcp, :::5432->5432/tcp demo_backend demo@localhost ~ % demo@localhost ~ % psql -h localhost -p 5432 -U postgres Password for user postgres: psql (14.0) Type "help" for help. postgres=# SELECT datname, dattablespace FROM pg_catalog.pg_database ORDER BY 2,1 DESC; datname | dattablespace -----------+--------------- template1 | 1663 template0 | 1663 postgres | 1663 (3 rows) postgres=# CREATE DATABASE demo; CREATE DATABASE postgres=# SELECT datname, dattablespace FROM pg_catalog.pg_database ORDER BY 2,1 DESC; datname | dattablespace -----------+--------------- template1 | 1663 template0 | 1663 postgres | 1663 demo | 1663 (4 rows) postgres=#
1.2 Подготовка брокера сообщений
demo@localhost ~ % docker run -d \ > -h localhost \ > -e REDIS_PASSWORD=redis \ > -p 6379:6379 \ > --name demo_broker \ > --restart always \ > redis:latest /bin/sh -c 'redis-server --appendonly yes --requirepass ${REDIS_PASSWORD}' 74d75f5ba433f18d466ee089ff96d95b2f2b038acb03d28f3c410fad90259d90 demo@localhost ~ % demo@localhost ~ % docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 74d75f5ba433 redis:latest "docker-entrypoint.s…" 3 seconds ago Up 3 seconds 0.0.0.0:6379->6379/tcp, :::6379->6379/tcp demo_broker 0b205dd31d60 postgres:latest "docker-entrypoint.s…" 9 minutes ago Up 9 minutes 0.0.0.0:5432->5432/tcp, :::5432->5432/tcp demo_backend demo@localhost ~ % demo@localhost ~ % redis-cli -h localhost -p 6379 localhost:6379> keys * (error) NOAUTH Authentication required. localhost:6379> AUTH redis OK localhost:6379> keys * (empty array) localhost:6379>
Шаг 2: Планирование и выполнение повторяющихся задач
В приведенном ниже примере кода мы определили две периодические задачи: current_weather(city)
и weather_forecast(zip_code)
. Они планируются двумя разными способами (в функции schedule_periodic_tasks()
). Первый запланирован с использованием регулярных интервалов времени и будет вызываться каждые 10 секунд, а второй запланирован с использованием расписания crontab и будет выполняться каждый понедельник утром в 7:30.
Для запуска планировщика задач в терминале выполняется следующая команда.
demo@localhost ~ % celery -A tasks beat --loglevel=INFO -s ./celerybeat-schedule_11-29-21 celery beat v5.1.2 (sun-harmonics) is starting. __ - ... __ - _ LocalTime -> 2021-11-29 00:51:11 Configuration -> . broker -> redis://:**@localhost:6379/0 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> ./celerybeat-schedule_11-29-21 . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2021-11-29 00:51:11,633: INFO/MainProcess] beat: Starting... [2021-11-29 00:51:21,669: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather) [2021-11-29 00:51:31,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather) [2021-11-29 00:51:41,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather) [2021-11-29 00:51:51,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather) [2021-11-29 00:52:01,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather) [2021-11-29 00:52:11,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather) [2021-11-29 00:52:21,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather)
Из приведенных выше логов в терминале видно, что планировщик задач начал проталкивать задачи в очередь сообщений (брокер), а расписания и статус хранятся в shelve
файле базы данных: celerybeat-schedule_11-29-21
. Проверив сообщения в брокере, мы можем это подтвердить.
localhost:6379> keys * 1) "celery" 2) "_kombu.binding.celery" localhost:6379> type celery list localhost:6379> llen celery (integer) 11 localhost:6379> lrange celery 0 1 1) "{\"body\": \"W1siTG9zIEFuZ2VsZXMiXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"tasks.current_weather\", \"id\": \"d0faec6e-4973-4054-8cda-0d63f17466f3\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"d0faec6e-4973-4054-8cda-0d63f17466f3\", \"parent_id\": null, \"argsrepr\": \"['Los Angeles']\", \"kwargsrepr\": \"{}\", \"origin\": \"gen81468@localhost\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"d0faec6e-4973-4054-8cda-0d63f17466f3\", \"reply_to\": \"3a9f95ed-cbee-37aa-8283-faa7df0fc0b4\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"2cc7d91a-d928-4cad-a618-c0b3187f41e0\"}}" 2) "{\"body\": \"W1siTG9zIEFuZ2VsZXMiXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"tasks.current_weather\", \"id\": \"bdcad31c-d5d6-43e9-80f5-265e895fc8b8\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"bdcad31c-d5d6-43e9-80f5-265e895fc8b8\", \"parent_id\": null, \"argsrepr\": \"['Los Angeles']\", \"kwargsrepr\": \"{}\", \"origin\": \"gen81468@localhost\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"bdcad31c-d5d6-43e9-80f5-265e895fc8b8\", \"reply_to\": \"3a9f95ed-cbee-37aa-8283-faa7df0fc0b4\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"f4433f4d-88a4-49dc-8344-66c7d30dc7d3\"}}" localhost:6379> localhost:6379>
Пока задачи помещаются в очередь сообщений, но рабочий процесс еще не запущен. Поэтому ни одна из задач не выполняется. Это можно подтвердить, проверив содержимое базы данных результатов.
postgres=# \c demo You are now connected to database "demo" as user "postgres". demo=# \dt Did not find any relations. demo=#
Чтобы задачи, помещенные в очередь, действительно выполнялись, мы начинаем работать в другом окне терминала следующим образом.
demo@localhost ~ % celery -A tasks worker --loglevel=info celery@localhost v5.1.2 (sun-harmonics) macOS-11.6-x86_64-i386-64bit 2021-11-29 01:10:18 [config] .> app: tasks:0x1097804f0 .> transport: redis://:**@localhost:6379/0 .> results: postgresql://postgres:**@localhost:5432/demo .> concurrency: 16 (prefork) .> task events: OFF (enable -E to monitor tasks in this worker) [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.current_weather . tasks.weather_forecast [2021-11-29 01:10:19,592: INFO/MainProcess] Connected to redis://:**@localhost:6379/0 [2021-11-29 01:10:19,606: INFO/MainProcess] mingle: searching for neighbors [2021-11-29 01:10:20,642: INFO/MainProcess] mingle: all alone [2021-11-29 01:10:20,675: INFO/MainProcess] celery@localhost ready. [2021-11-29 01:10:20,679: INFO/MainProcess] Task tasks.current_weather[2e688dce-7d2b-4c5f-a97e-1b1eefd3d359] received [2021-11-29 01:10:20,684: INFO/MainProcess] Task tasks.current_weather[5f4b9b44-5c8b-4393-a64c-e1b511471950] received [2021-11-29 01:10:20,688: INFO/MainProcess] Task tasks.current_weather[31d7a1cd-3284-4c37-a105-ca2e595361e8] received [2021-11-29 01:10:20,693: INFO/MainProcess] Task tasks.current_weather[c1af82ac-5085-44a8-9ed3-8c8340ad7696] received [2021-11-29 01:10:20,697: INFO/MainProcess] Task tasks.current_weather[7fd04e9f-49bc-4bc5-83b2-96d1c0662857] received [2021-11-29 01:10:20,701: INFO/MainProcess] Task tasks.current_weather[bdcad31c-d5d6-43e9-80f5-265e895fc8b8] received [2021-11-29 01:10:20,706: INFO/MainProcess] Task tasks.current_weather[d0faec6e-4973-4054-8cda-0d63f17466f3] received [2021-11-29 01:10:20,976: INFO/MainProcess] Task tasks.current_weather[3ffe5246-6f58-4ceb-9df9-dda3be3aa5fc] received [2021-11-29 01:10:20,983: INFO/MainProcess] Task tasks.current_weather[f551e732-d3fb-4263-b479-93ef41256938] received [2021-11-29 01:10:20,989: INFO/MainProcess] Task tasks.current_weather[8275a0af-fdfd-42e7-a40b-027c37a6410c] received [2021-11-29 01:10:20,994: INFO/MainProcess] Task tasks.current_weather[46c2b209-eaf9-4c22-99f5-eb4e3ff40bf6] received [2021-11-29 01:10:21,521: INFO/ForkPoolWorker-16] Task tasks.current_weather[2e688dce-7d2b-4c5f-a97e-1b1eefd3d359] succeeded in 0.8405978789999997s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:21,548: INFO/ForkPoolWorker-2] Task tasks.current_weather[31d7a1cd-3284-4c37-a105-ca2e595361e8] succeeded in 0.858004432s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:21,944: INFO/ForkPoolWorker-9] Task tasks.current_weather[8275a0af-fdfd-42e7-a40b-027c37a6410c] succeeded in 0.9534888619999999s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:21,977: INFO/ForkPoolWorker-1] Task tasks.current_weather[5f4b9b44-5c8b-4393-a64c-e1b511471950] succeeded in 1.2927104649999999s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:22,312: INFO/ForkPoolWorker-8] Task tasks.current_weather[f551e732-d3fb-4263-b479-93ef41256938] succeeded in 1.3275789269999998s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:22,542: INFO/ForkPoolWorker-3] Task tasks.current_weather[c1af82ac-5085-44a8-9ed3-8c8340ad7696] succeeded in 1.848025523s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:22,880: INFO/ForkPoolWorker-10] Task tasks.current_weather[46c2b209-eaf9-4c22-99f5-eb4e3ff40bf6] succeeded in 1.884862826s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:23,048: INFO/ForkPoolWorker-7] Task tasks.current_weather[3ffe5246-6f58-4ceb-9df9-dda3be3aa5fc] succeeded in 2.0701256270000004s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:23,592: INFO/ForkPoolWorker-6] Task tasks.current_weather[d0faec6e-4973-4054-8cda-0d63f17466f3] succeeded in 2.621252047s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:24,050: INFO/ForkPoolWorker-4] Task tasks.current_weather[7fd04e9f-49bc-4bc5-83b2-96d1c0662857] succeeded in 3.3515452580000002s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87} [2021-11-29 01:10:24,592: INFO/ForkPoolWorker-5] Task tasks.current_weather[bdcad31c-d5d6-43e9-80f5-265e895fc8b8] succeeded in 3.889102834s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}
Из приведенных выше журналов видно, что воркер начал выполнение поставленных в очередь задач. Проверяя бэкэнд результатов (базу данных), мы видим, что первые несколько результатов были сохранены.
demo=# \dt List of relations Schema | Name | Type | Owner --------+--------------------+-------+---------- public | celery_taskmeta | table | postgres public | celery_tasksetmeta | table | postgres (2 rows) demo=# SELECT COUNT(*) FROM celery_taskmeta; count ------- 11 (1 row) demo=# demo=# SELECT task_id, status, result FROM celery_taskmeta; task_id | status | result --------------------------------------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 2e688dce-7d2b-4c5f-a97e-1b1eefd3d359 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e 31d7a1cd-3284-4c37-a105-ca2e595361e8 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e 8275a0af-fdfd-42e7-a40b-027c37a6410c | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e 5f4b9b44-5c8b-4393-a64c-e1b511471950 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e f551e732-d3fb-4263-b479-93ef41256938 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e c1af82ac-5085-44a8-9ed3-8c8340ad7696 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e 46c2b209-eaf9-4c22-99f5-eb4e3ff40bf6 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e 3ffe5246-6f58-4ceb-9df9-dda3be3aa5fc | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e d0faec6e-4973-4054-8cda-0d63f17466f3 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e 7fd04e9f-49bc-4bc5-83b2-96d1c0662857 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e bdcad31c-d5d6-43e9-80f5-265e895fc8b8 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c 09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e (11 rows) demo=#
Результаты, хранящиеся в базе данных, были сериализованы. Для просмотра десериализованных результатов можно использовать либо pickle
, либо SQLAlchemy
(это показано в [2]).
Заключение
В этом руководстве я продемонстрировал, как планировать повторяющиеся задачи, используя как регулярные интервалы времени, так и расписание crontab, и в этом примере мы использовали Redis в качестве брокера и PostgreSQL в качестве серверной части результатов. Полный образец кода, полностью развернутый с помощью docker-compose, доступен в этом репозитории Github.
В отличие от планирования задач с помощью Cron, сельдерей выполняет задачи параллельно. В примере мы запустили только один рабочий процесс, но из журналов мы видим, что он имеет 16 потоков, что указывает на то, что в нашем примере мы можем выполнять до 16 задач одновременно. Если бы было запущено больше рабочих процессов, мы могли бы одновременно выполнять больше задач. Однако, как и в Cron, в Celery beat есть проблема единой точки отказа. Согласно документации,
Вы должны убедиться, что для расписания одновременно работает только один планировщик, иначе вы получите дублирующиеся задачи.
Чтобы планировать повторяющиеся задачи в распределенном стиле, нам нужно либо интегрировать celery beat с распределенной блокировкой, либо использовать совершенно другой инструмент, такой как Chronos, Quartz, Dkron и т. д. В следующем уроке я покажу, как интегрировать celery. beat с распределенной блокировкой, чтобы повторяющиеся задачи можно было планировать в распределенном стиле.
Больше контента на plainenglish.io. Подпишитесь на нашу бесплатную еженедельную рассылку новостей. Получите эксклюзивный доступ к возможностям написания и советам в нашем сообществе Discord.