Менеджер многопроцессорной обработки Python и совместное использование составных шаблонов

Я пытаюсь поделиться составной структурой через диспетчер многопроцессорной обработки, но столкнулся с проблемой "RuntimeError: превышена максимальная глубина рекурсии" при попытке использовать только один из методов класса Composite.

Класс является токеном из code.activestate и протестирован мной перед включением в менеджер .

При извлечении класса в процесс и вызове его метода addChild() я сохранил RunTimeError, а вне процесса он работает.

Составной класс наследуется от класса SpecialDict, реализующего метод ** ____getattr()____ **.

Возможно ли, что при вызове addChild() интерпретатор python ищет другой ** ____getattr()____ **, потому что менеджер не проксирует правильный?

Если это так, мне не ясно, как правильно сделать прокси для этого класса/метода.

Следующий код воспроизводит именно это условие:

1) это файл manager.py:

from multiprocessing.managers import BaseManager
from CompositeDict import *

class PlantPurchaser():

    def __init__(self):
        self.comp  = CompositeDict('Comp')

    def get_cp(self):
        return self.comp

class Manager():

    def __init__(self):

        self.comp  = QueuePurchaser().get_cp()

        BaseManager.register('get_comp', callable=lambda:self.comp)

        self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        self.s = self.m.get_server()

        self.s.serve_forever()

2) Я хочу использовать композит в этом Consumer.py:

from multiprocessing.managers import BaseManager

class Consumer():

    def __init__(self):

        BaseManager.register('get_comp')

        self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        self.m.connect()

        self.comp = self.m.get_comp()
        ret = self.comp.addChild('consumer')

3) выполнить все запуски через controller.py:

from multiprocessing import Process

class Controller():
    def __init__(self):
        for child in _run_children():
            child.join()

def _run_children():

    from manager import Manager
    from consumer import Consumer as Consumer

procs = (
         Process(target=Manager,  name='Manager' ),
         Process(target=Consumer, name='Consumer'),
        )

for proc in procs:
    proc.daemon = 1
    proc.start()
return procs

c = Controller()

Взгляните на связанные вопросы о том, как сделать прокси для класса CompositeDict(), предложенный AlberT.

Решение, данное tgray, работает, но не может избежать условий гонки.


person DrFalk3n    schedule 24.09.2009    source источник
comment
Вы как-то меняли код, когда брали его из code.activestate? Можете ли вы опубликовать соответствующий код, который извлекает класс в процесс, а затем вызывает метод addChild()?   -  person tgray    schedule 25.09.2009
comment
Код композита абсолютно неизменен. Я собираюсь обновить соответствующий код....   -  person DrFalk3n    schedule 25.09.2009
comment
Вам нужны только 2 подпроцесса? Когда я запускаю ваш код, он продолжает запускать все больше и больше процессов, пока не выйдет из строя.   -  person tgray    schedule 25.09.2009
comment
Это сжатый код. Есть только 2 подпроцесса, потому что они необходимы для решения проблемы. Я видел ты этот рекурсивный запуск процессов?   -  person DrFalk3n    schedule 25.09.2009
comment
Я видел его в Process Explorer (sysinternals.com), но вы должны увидеть его и в диспетчере задач, если вы используете Windows.   -  person tgray    schedule 25.09.2009


Ответы (3)


Возможно ли, что между классами существует циклическая ссылка? Например, внешний класс имеет ссылку на составной класс, а составной класс имеет обратную ссылку на внешний класс.

Диспетчер многопроцессорности работает хорошо, но когда у вас большие и сложные структуры классов, вы, вероятно, столкнетесь с ошибкой, когда тип/ссылка не могут быть правильно сериализованы. Другая проблема заключается в том, что ошибки диспетчера многопроцессорности очень загадочны. Это еще больше усложняет отладку условий сбоя.

person cmcginty    schedule 24.09.2009
comment
Довольно скоро случается, что тип/ссылка не сериализованы правильно, и, да!, их трудно отлаживать. Это не похоже на циклическую ссылку, как в вашем примере, или, по крайней мере, не так очевидно. +1 за интерес к моему вопросу :-) - person DrFalk3n; 25.09.2009

Я думаю, проблема в том, что вы должны проинструктировать менеджера о том, как управлять вашим объектом, который не является стандартным типом Python.

В других мирах вы должны создать для себя прокси-сервер CompositeDict

Пример можно посмотреть в этом документе: http://ruffus.googlecode.com/svn/trunk/doc/html/sharing_data_across_jobs_example.html

person drAlberT    schedule 25.09.2009

Python имеет максимальную глубину рекурсии по умолчанию 1000 (или 999, я забыл...). Но вы можете изменить поведение по умолчанию следующим образом:

import sys
sys.setrecursionlimit(n)

Где n — количество рекурсий, которые вы хотите разрешить.

Изменить:

Приведенный выше ответ ничего не делает для решения основной причины этой проблемы (как указано в комментариях). Его нужно использовать только в том случае, если вы намеренно повторяете более 1000 раз. Если вы находитесь в бесконечном цикле (как в этой задаче), вы в конечном итоге достигнете любого установленного вами предела.

Чтобы решить вашу настоящую проблему, я переписал ваш код с нуля, начав так просто, как только мог, и построил его так, как я считаю, что вы хотите:

import sys
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from CompositDict import *

class Shared():
    def __init__(self):
        self.comp = CompositeDict('Comp')

    def get_comp(self):
        return self.comp

    def set_comp(self, c):
        self.comp = c

class Manager():
    def __init__(self):
        shared = Shared()
        BaseManager.register('get_shared', callable=lambda:shared)
        mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        srv = mgr.get_server()
        srv.serve_forever()

class Consumer():
    def __init__(self, child_name):
        BaseManager.register('get_shared')
        mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        mgr.connect()

        shared = mgr.get_shared()
        comp = shared.get_comp()
        child = comp.addChild(child_name)
        shared.set_comp(comp)
        print comp

class Controller():
    def __init__(self):
        pass

    def main(self):
        m = Process(target=Manager, name='Manager')
        m.daemon = True
        m.start()

        consumers = []
        for i in xrange(3):
            p = Process(target=Consumer, name='Consumer', args=('Consumer_' + str(i),))
            p.daemon = True
            consumers.append(p)

        for c in consumers:
            c.start()
        for c in consumers:
            c.join()
        return 0


if __name__ == '__main__':
    con = Controller()
    sys.exit(con.main())

Я сделал это все в одном файле, но у вас не должно возникнуть проблем с его разбивкой.

Я добавил аргумент child_name вашему потребителю, чтобы проверить, обновляется ли CompositDict.

Обратите внимание, что для вашего объекта CompositDict существует как геттер, так и сеттер. Когда у меня был только геттер, каждый потребитель перезаписывал CompositDict, когда добавлял дочерний элемент.

Вот почему я также изменил ваш зарегистрированный метод на get_shared вместо get_comp, так как вам понадобится доступ к установщику, а также к получателю в вашем классе Consumer.

Кроме того, я не думаю, что вы хотите попробовать присоединиться к процессу вашего менеджера, так как он будет «служить вечно». Если вы посмотрите на исходный код BaseManager (./Lib/multiprocessing/managers.py:Line 144), вы заметите, что функция serve_forever() помещает вас в бесконечный цикл, который прерывается только KeyboardInterrupt или SystemExit.

Суть в том, что этот код работает без рекурсивного зацикливания (насколько я могу судить), но дайте мне знать, если вы все еще сталкиваетесь с ошибкой.

person tgray    schedule 24.09.2009
comment
Да, но таким образом я не могу ни избежать проблемы, ни аргументировать причину! - person DrFalk3n; 24.09.2009
comment
Это не устранит причину, даже в том случае, если замаскирует проблему. - person drAlberT; 25.09.2009
comment
Вы правы, я ответил слишком быстро, не понимая всей проблемы. - person tgray; 25.09.2009
comment
Спасибо!, +1, но из вашего кода я получаю AttributeError: объект AutoProxy[get_shared] не имеет атрибута get_comp. Почему AutoProxy не работает? см. также другие мои вопросы, связанные с stackoverflow.com/questions/1478351/ - person DrFalk3n; 28.09.2009
comment
Класс Shared() и его использование в потребителе являются ключом!!! действительно большое спасибо! - person DrFalk3n; 28.09.2009
comment
Еще одно наблюдение: с вашим кодом вы не можете избежать условий гонки между одновременными потребителями! Итак, опять же, лучше всего напрямую проксировать класс Compositedict. - person DrFalk3n; 28.09.2009
comment
Ах я вижу. Обычно я просто использую объект Queue, чтобы избежать условий гонки. - person tgray; 28.09.2009