Карта общего пула между процессами с объектно-ориентированным python

(python2.7)

Я пытаюсь сделать своего рода сканер, который должен проходить через узлы CFG и разделяться на разные процессы при ветвлении для целей параллелизма.

Сканер представлен объектом класса Scanner. Этот класс имеет один метод traverse, который проходит по указанному графу и при необходимости разделяет его.

Вот как это выглядит:

class Scanner(object):
    def __init__(self, atrb1, ...):
       self.attribute1 = atrb1
       self.process_pool = Pool(processes=4)
    def traverse(self, ...):
        [...]
        if branch:
           self.process_pool.map(my_func, todo_list).

Моя проблема заключается в следующем: как мне создать экземпляр multiprocessing.Pool, который будет использоваться всеми моими процессами? Я хочу, чтобы он был общим, потому что, поскольку путь можно снова разделить, я не хочу заканчиваться чем-то вроде форк-бомбы, а наличие одного и того же пула поможет мне ограничить количество процессов, выполняющихся одновременно.

Вышеприведенный код не работает, т.к. пул нельзя замариновать. В результате я попробовал это:

class Scanner(object):
   def __getstate__(self):
      self_dict  = self.__dict__.copy()
      def self_dict['process_pool']
      return self_dict
    [...]

Но, очевидно, это приводит к тому, что self.process_pool не определен в созданных процессах.

Затем я попытался создать пул как атрибут модуля:

process_pool = Pool(processes=4)

def my_func(x):
    [...]

class Scanner(object):
    def __init__(self, atrb1, ...):
       self.attribute1 = atrb1
    def traverse(self, ...):
        [...]
        if branch:
           process_pool.map(my_func, todo_list)

Это не работает, и этот ответ объясняет, почему. Но вот в чем дело, где бы я ни создавал свой пул, чего-то не хватает. Если я создам этот пул в конце своего файла, он не увидит self.attribute1, так же как не увидит answer и завершается с ошибкой AttributeError.

Я даже не пытаюсь поделиться этим, и я уже застрял в многопроцессорном способе работы.

Я не знаю, правильно ли я думал обо всем этом, но я не могу поверить, что так сложно справиться с чем-то таким простым, как «иметь пул рабочих и давать им задачи».

Спасибо,

EDIT: я решил свою первую проблему (AttributeError), мой класс имел обратный вызов в качестве атрибута, и этот обратный вызов был определен в основном файле сценария после импорта модуля сканера... Но параллелизм и «не разветвляйте бомбу» по-прежнему являются проблемой.


person Pierre De Abreu    schedule 18.04.2016    source источник


Ответы (1)


То, что вы хотите сделать, не может быть сделано безопасно. Подумайте, был ли у вас каким-то образом один общий Pool, совместно используемый родительским и рабочим процессами, скажем, двумя рабочими процессами. Родитель запускает map, который пытается выполнить две задачи, и каждой задаче нужно map еще две задачи. Две родительские отправленные задачи отправляются каждому рабочему и родительским блокам. Каждый рабочий отправляет еще две задачи в общий пул и блокирует их выполнение. Но теперь все рабочие теперь заняты, ожидая освобождения рабочего; вы зашли в тупик.

Более безопасный подход состоял бы в том, чтобы работники возвращали достаточно информации для отправки дополнительных задач в родителя. Тогда вы можете сделать что-то вроде:

class MoreWork(object):
    def __init__(self, func, *args):
        self.func = func
        self.args = args

pool = multiprocessing.Pool()
try:
    base_task = somefunc, someargs
    outstanding = collections.deque([pool.apply_async(*base_task)])
    while outstanding:
        result = outstanding.popleft().get()
        if isinstance(result, MoreWork):
            outstanding.append(pool.apply_async(result.func, result.args))
        else:
            ... do something with a "final" result, maybe breaking the loop ...
finally:
     pool.terminate()

Что это за функции, зависит от вас, они просто возвращали информацию в MoreWork, когда нужно было что-то сделать, а не запускали задачу напрямую. Смысл в том, чтобы гарантировать, что если родительский элемент будет нести единоличную ответственность за отправку задач, а работники - исключительно за завершение задачи, вы не сможете зайти в тупик из-за того, что все рабочие процессы будут заблокированы в ожидании задач, которые находятся в очереди, но не обрабатываются. .

Это также совсем не оптимизировано; в идеале вы бы не блокировали ожидание первого элемента в очереди, если другие элементы в очереди были завершены; гораздо проще сделать это с модулем concurrent.futures, особенно с concurrent.futures.wait для ожидания первого доступного результата из произвольного числа невыполненных задач, но вам понадобится сторонний пакет PyPI, чтобы получить concurrent.futures на Python 2.7.

person ShadowRanger    schedule 18.04.2016