Я работаю над задачей celery beat в рамках проекта django, который периодически создает записи в базе данных. Я знаю это, потому что когда я поставил задачу следующим образом:
сельдерей.ру:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
app = Celery("clock-backend", broker=os.environ.get("RABBITMQ_URL"))
app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.beat_schedule = {
'create_reports_monthly': {
'task': 'project_celery.tasks.create_reports_monthly',
'schedule': 10.0,
},
}
app.autodiscover_tasks()
И запустите мой проект, он действительно создает объект каждые 10 секунд.
Но что я действительно хочу сделать, так это настроить его на запуск каждый первый день месяца.
Для этого я бы изменил "schedule": crontab(0, 0, day_of_month="1")
.
Вот моя реальная проблема: как я могу проверить, что это действительно работает?
И под тестированием я подразумеваю фактические (модульные) тесты.
Что я пробовал, так это работать с пакетом под названием freezegun. Тест с этим выглядит так:
def test_start_of_month_report_creation(self, user_object, contract_object, report_object):
# set time to the last day of January
with freeze_time("2019-01-31 23:59:59") as frozen_time:
# let one second pass
frozen_time.tick()
# give the celery task some time
time.sleep(20)
# Test Logic to check whether the object was created
# Example: assert MyModel.objects.count() > 0
Но это не сработало. Я подозреваю, что ритм сельдерея использует не время, установленное через freezgun/python, а настоящие «аппаратные» часы.
Я также пытался установить аппаратные часы, например здесь, но это не сработало в моя установка.
Я благодарен за любые комментарии, замечания или помощь по этой теме, так как я действительно хотел бы реализовать тест для этого.