Celery worker зависает при работе с скрученным

Ниже приведен фрагмент кода с установленным сельдереем. Здесь я запускаю скрученный реактор в каждом дочернем рабочем процессе сельдерея.

import os
from threading import Thread
from celery import Celery
from twisted.internet import threads, reactor
from celery import signals
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    thread = Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
    print('STARTED NEW WORKER', os.getpid())

@signals.worker_process_shutdown.connect()
def shutdown_reactor(**kwargs):
    reactor.callFromThread(reactor.stop)
    print('REACTOR SHUTDOWN', os.getpid())

class OrgUnitEventHandler():
    @inlineCallbacks
    def process(self, *args, **kwargs):
        val = sum(args)
        yield val
        returnValue(val)

def inThread(x, y): 
    obj = OrgUnitEventHandler()
    output = threads.blockingCallFromThread(reactor,
                                        obj.process, x, y)
    return output

@app.task
def add(x, y):
    print('ADD --> '+str(os.getpid()))
    result = inThread(x, y)
    print("FINAL RESULT--> "+str(result)+"-->"+str(os.getpid()))
    return result

С приведенным выше кодом я вижу, что большую часть времени рабочий сельдерей зависал и освобождался, когда новая задача снова появлялась на том же рабочем. смотрите ниже логи..

[2020-05-07 20:12:26,481: WARNING/ForkPoolWorker-1] STARTED NEW WORKER
[2020-05-07 20:12:26,481: WARNING/ForkPoolWorker-1] 11651
[2020-05-07 20:12:26,498: WARNING/ForkPoolWorker-2] STARTED NEW WORKER
[2020-05-07 20:12:26,499: WARNING/ForkPoolWorker-2] 11653
[2020-05-07 20:12:26,515: WARNING/ForkPoolWorker-3] STARTED NEW WORKER
[2020-05-07 20:12:26,515: WARNING/ForkPoolWorker-3] 11655
[2020-05-07 20:12:26,533: WARNING/ForkPoolWorker-4] STARTED NEW WORKER
[2020-05-07 20:12:26,534: WARNING/ForkPoolWorker-4] 11659
[2020-05-07 20:12:32,239: WARNING/ForkPoolWorker-1] ADD --> 11651   # task-1 hang
[2020-05-07 20:12:36,611: WARNING/ForkPoolWorker-2] ADD --> 11653   # task-2 hang
[2020-05-07 20:13:00,858: WARNING/ForkPoolWorker-3] ADD --> 11655   # task-3 hang
[2020-05-07 20:13:00,859: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->11653  # task-2 Done
[2020-05-07 20:13:00,859: WARNING/ForkPoolWorker-3] FINAL RESULT--> 3-->11655 # task-3 done

Я никогда не вижу проблемы с зависанием сельдерея, когда использую Loopingcall при запуске реактора.

...


class EventLoop(object):
    def _startLoopingCall(self, reactor):
        lc = LoopingCall(print)
        lc.start(1, False)

    def setup(self, reactor):
        reactor.callFromThread(self._startLoopingCall, reactor)
        thread = Thread(target=lambda: reactor.run(installSignalHandlers=False),name="reactor ").start()

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    EventLoop().setup(reactor)
    #thread = Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
    print('STARTED NEW WORKER', os.getpid())

...

Журналы, подтверждающие, что рабочие процессы celery никогда не зависали с приведенным выше кодом:

[2020-05-07 20:22:08,824: WARNING/ForkPoolWorker-1] STARTED NEW WORKER
[2020-05-07 20:22:08,824: WARNING/ForkPoolWorker-1] 12302
[2020-05-07 20:22:08,841: WARNING/ForkPoolWorker-2] STARTED NEW WORKER
[2020-05-07 20:22:08,841: WARNING/ForkPoolWorker-2] 12304
[2020-05-07 20:22:08,857: WARNING/ForkPoolWorker-3] STARTED NEW WORKER
[2020-05-07 20:22:08,857: WARNING/ForkPoolWorker-3] 12306
[2020-05-07 20:22:08,873: WARNING/ForkPoolWorker-4] STARTED NEW WORKER
[2020-05-07 20:22:08,874: WARNING/ForkPoolWorker-4] 12308


[2020-05-07 20:22:12,446: WARNING/ForkPoolWorker-1] ADD --> 12302
[2020-05-07 20:22:12,826: WARNING/ForkPoolWorker-1] FINAL RESULT--> 3-->12302
[2020-05-07 20:22:15,489: WARNING/ForkPoolWorker-2] ADD --> 12304
[2020-05-07 20:22:15,842: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->12304
[2020-05-07 20:22:17,828: WARNING/ForkPoolWorker-1] ADD --> 12302
[2020-05-07 20:22:17,828: WARNING/ForkPoolWorker-1] FINAL RESULT--> 3-->12302
[2020-05-07 20:22:20,475: WARNING/ForkPoolWorker-2] ADD --> 12304
[2020-05-07 20:22:20,842: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->12304

Я не могу понять, какую магию LoopingCall делает здесь с искривленным реактором, чтобы заставить его работать правильно. Поделитесь своими идеями


person Prashant Gaur    schedule 07.05.2020    source источник
comment
Вы должны вернуться к доске дизайна и переосмыслить подход. Скрученные внутренние компоненты плохо себя ведут при запуске в таких потоках. Ваша реализация запустит реактор в потоке, отличном от основного потока, и это только запутает и усложнит отладку (как вы теперь обнаружили). Однако запустить один Twisted Reactor в потоке для выполнения асинхронных задач в фоновом режиме можно с помощью связать. Я думаю, вы в основном пытаетесь это сделать. Не очень понятно, почему вы запускаете Twisted в потоке или зачем вам вообще нужен Twisted.   -  person notorious.no    schedule 11.05.2020
comment
@ notorious.no Я вижу то же самое поведение с вязанием крючком, что у него есть LoopingCall каждые 0,1 секунды для reapallprocesses. (github.com/itamarst/crochet/blob/master/crochet /), я не уверен, зачем делать эти вызовы здесь в вязании крючком?   -  person Prashant Gaur    schedule 14.05.2020