Как запланировать пользовательские задания с помощью APscheduler в представлении Django

Идея состоит в том, чтобы позволить пользователям отправлять время/дату на сервер, чтобы задание было запланировано для выполнения в указанное пользователем время.

Мой первый выбор состоял в том, чтобы сделать это в представлении, куда отправляются данные из внешнего интерфейса. Однако apscheduler возвращает ошибку значения.

Ниже приведен фрагмент представления, которое я хочу использовать для заданий, запланированных пользователем, и просто небольшой тест.

class SchedulePolicyDeployView(APIView):
 def post(self, request, version):

    def test_job():
        print("cron job test")

    scheduler.start()
    scheduler.add_job(test_job, "cron", id="test_job", day="*", minute="*/1")

    return Response("job scheduled successfully")

Возвращаемое сообщение об ошибке:

ValueError: This Job cannot be serialized since the reference to its callable (<function SchedulePolicyDeployView.post.<locals>.test_job at 0x05694C00>) could not be determined. Consider giving a textual reference (module:function name) instead.

person SharkBark    schedule 20.10.2019    source источник
comment
Вы решили это?   -  person Yunus    schedule 19.07.2020
comment
Да. Пытался написать свой ответ здесь, но не хватило места. Ответим на этот вопрос и отметим его как решенный в ближайшее время.   -  person SharkBark    schedule 20.07.2020


Ответы (1)


Поэтому я исправил эту проблему в своем коде; но он немного изменился с тех пор, как я задал этот вопрос. Я попытался привести все это здесь, хотя:

*при условии, что объект планировщика уже инициализирован

планировщик.py

def schedule_job(arg1, arg2, arg3, time, job_id):
    scheduler.add_job(deploy_policy, 'date', args=[arg1, arg2, arg3], run_date=datetime(int(date['year']), int(date['month']), int(date['day']), int(time['hour']), int(time['minute']), 0))
    return Response({"message": "scheduled!"})

просмотры.py

from scheduler import schedule_job
class SchedulePolicyDeployView(APIView):
    def post(self, request, version):
        schedule_job(apic, username, password, policy_url_and_payload_result, date_dict, time_dict, payload["job_id"])
        return Response({"result": "success"})
person SharkBark    schedule 20.07.2020