максимальная ошибка рекурсии при использовании futures.ProcessPoolExecutor, но не futures.ThreadPoolExecutor с оболочкой PRAW

Я использую этот код для очистки API:

submissions = get_submissions(1)
with futures.ProcessPoolExecutor(max_workers=4) as executor:
#or using this: with futures.ThreadPoolExecutor(max_workers=4) as executor:
    for s in executor.map(map_func, submissions):
        collection_front.update({"time_recorded":time_recorded}, {'$push':{"thread_list":s}}, upsert=True)

Он отлично работает/быстро с потоками, но когда я пытаюсь использовать процессы, я получаю полную очередь и эту ошибку:

  File "/usr/local/lib/python3.4/dist-packages/praw/objects.py", line 82, in __getattr__
    if not self.has_fetched:
RuntimeError: maximum recursion depth exceeded
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.4/concurrent/futures/process.py", line 251, in _queue_management_worker
    shutdown_worker()
  File "/usr/lib/python3.4/concurrent/futures/process.py", line 209, in shutdown_worker
    call_queue.put_nowait(None)
  File "/usr/lib/python3.4/multiprocessing/queues.py", line 131, in put_nowait
    return self.put(obj, False)
  File "/usr/lib/python3.4/multiprocessing/queues.py", line 82, in put
    raise Full
queue.Full

Traceback (most recent call last):
  File "reddit_proceses.py", line 64, in <module>
    for s in executor.map(map_func, submissions):
  File "/usr/lib/python3.4/concurrent/futures/_base.py", line 549, in result_iterator
    yield future.result()
  File "/usr/lib/python3.4/concurrent/futures/_base.py", line 402, in result
    return self.__get_result()
  File "/usr/lib/python3.4/concurrent/futures/_base.py", line 354, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Обратите внимание, что изначально процессы работали отлично и очень быстро при извлечении небольших данных, но теперь они не работают вообще. Это ошибка или что происходит, когда объект PRAW вызывает ошибку рекурсии с процессами, но не с потоками?


comment
Очередь, на которую он жалуется, на самом деле является внутренней очередью с именем call_queue. Он определяется максимальным размером: self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS). Это в процессе выключения Executor, когда возникает эта ошибка. Похоже, вы, возможно, отрезали верхнюю часть первой трассировки. Вы можете включить это? Я сталкивался с этой ошибкой queue.Full раньше, но это было потому, что я взламывал concurrent.futures.ProcessPoolExecutor и менял его поведение/время выключения. Мне интересно, возможно, вы столкнулись с редкой ошибкой.   -  person dano    schedule 12.06.2015
comment
@dano Я прикрепил более полную ошибку выше в исходном вопросе.   -  person sunny    schedule 12.06.2015
comment
Хм, это похоже на возможный триггер: RuntimeError: maximum recursion depth exceeded. Кажется, это исходит от /usr/local/lib/python3.4/dist-packages/praw/objects.py. Есть мысли по этому поводу?   -  person dano    schedule 12.06.2015
comment
@dano тьфу, теперь это дает мне эту ошибку рекурсии даже сейчас, когда я вернулся к получению небольших объемов информации. код вообще не изменился..... и я не получаю эту ошибку рекурсии, когда использую ThreadPoolExecutor вместо ProcessPoolExecutor.   -  person sunny    schedule 12.06.2015


Ответы (1)


У меня была аналогичная проблема при переходе от потоков к процессам, только я использовал executor.submit. Я думаю, что это может быть та же проблема, что и у вас, но я не могу быть уверен, потому что я не знаю, в каком контексте работает ваш код.

В моем случае произошло следующее: я запускал свой код как скрипт и не использовал всегда рекомендуемый if __name__ == "__main__":. Похоже, что при запуске нового процесса с исполнителем python загружает файл .py и запускает функцию, указанную в submit. Поскольку он загружает файл, код, который существует в основном файле (не внутри функций или приведенного выше предложения if), запускается, поэтому каждый процесс будет снова запускать новый процесс, имеющий бесконечную рекурсию.

Похоже, что это не происходит с потоками.

person dyeray    schedule 01.10.2015
comment
Если я правильно понимаю, у меня тоже была похожая проблема. В Windows все, что использует multiprocessing, должно иметь multiprocessing.freeze_support() перед любым кодом, который не должен запускаться перед рабочей функцией в подпроцессах. Это потому, что из-за отсутствия fork() в Windows подпроцессы выполняются при запуске одной и той же командной строки с дополнительными аргументами. freeze_support() ищет эти аргументы и запускает платформу multiprocessing вместо следующего кода, если они присутствуют. - person ivan_pozdeev; 02.10.2015
comment
В моем случае multiprocessing.freeze_support() не имеет значения. Может быть связано, но это не похоже на ту же проблему. В документации Python говорится, что заморозка_поддержка действует только в сценариях, замороженных до исполняемого файла, созданных с помощью py2exe или аналогичных. Я тоже на Windows, но я не знаю, будет ли поведение на других ОС таким же или нет. - person dyeray; 02.10.2015
comment
В документации Python говорится, что замораживание_поддержка действует только в сценариях, замороженных в exe - да, это так, но это не так (по крайней мере, этого не было, когда я в последний раз имел с этим дело). - person ivan_pozdeev; 02.10.2015