Получение объектов Queue должно использоваться только между процессами через наследование, но я не использую Queue

Я пытаюсь использовать ProcessPoolExecutor, но получаю сообщение об ошибке «Объекты очереди должны быть разделены между процессами только через наследование», но я не использую очередь (по крайней мере, явно). Я не могу найти ничего, что объясняет, что я делаю неправильно.

Вот некоторый код, демонстрирующий проблему (не мой фактический код):

from concurrent.futures import ProcessPoolExecutor, as_completed

class WhyDoesntThisWork:

    def __init__(self):
        self.executor = ProcessPoolExecutor(4)

    def execute_something(self, starting_letter):
        futures = [self.executor.submit(self.something, starting_letter, d) for d in range(4)]
        letter = None
        for future in as_completed(futures):
            letter = future.result()
        print(letter)

    def something(self, letter, d):
        # do something pointless for the example
        for x in range(d):
            letter = chr(ord(letter) + 1)

if __name__ == '__main__':
    WhyDoesntThisWork(). execute_something('A')

Эль Русо указал, что если сделать something() статическим или классовым методом, ошибка исчезнет. К сожалению, мой фактический код должен вызывать другие методы, используя self.


person ThatAintWorking    schedule 07.11.2017    source источник
comment
На самом деле вы используете две очереди! Объяснение: все объекты, переданные в concurrent.futures.ProcessPoolExecutor, сериализуются. Поэтому, когда вы отправляете метод self.something (который является объектом, который хранит self в своем атрибуте __self__, см. дескрипторы), self тоже сериализуется, поэтому он должен быть сериализуемым. Но из-за атрибута self.executor это не так, поскольку concurrent.futures.ProcessPoolExecutor имеет базовый вызов multiprocessing.Queue и результат multiprocessing.Queue.   -  person Maggyero    schedule 09.07.2019


Ответы (2)


попробуйте этот код для something

@staticmethod
def something(letter, d):
    # do something pointless for the example
    for x in range(d):
        letter = chr(ord(letter) + 1)

или рефакторинг для:

from concurrent.futures import ProcessPoolExecutor, as_completed


class WhyDoesntThisWork:
    def something(self, letter, d):
        # do something pointless for the example
        for x in range(d):
            letter = chr(ord(letter) + 1)
        return letter


if __name__ == '__main__':
    executor = ProcessPoolExecutor(4)
    letter = 'A'
    obj = WhyDoesntThisWork()
    futures = [executor.submit(obj.something, letter, d) for d in range(4)]
    for future in as_completed(futures):
        print(future.result())
person El Ruso    schedule 07.11.2017
comment
Но в моем реальном коде он не может быть статичным, потому что он вызывает другие методы для самого себя. - person ThatAintWorking; 07.11.2017
comment
classmethod тоже можно использовать. - person El Ruso; 07.11.2017
comment
Так почему же вызов обычного метода экземпляра вызывает ошибку? - person ThatAintWorking; 07.11.2017
comment
Я до сих пор не понимаю, почему исходный код не работает, но я смог реорганизовать свой код, чтобы сделать все методы, которые нужно было называть методами класса. Это решает мою проблему. Спасибо!! - person ThatAintWorking; 07.11.2017
comment
Я тоже не знаю причин такого поведения - person El Ruso; 07.11.2017

Может быть решена без использования статического подхода.

При использовании process каждый процесс выполняется в независимом пространстве памяти. Это отличается от использования потока, когда разные потоки выполняются в рамках одного и того же процесса, используя одно и то же пространство памяти. Таким образом, ошибка возникает не при использовании ThreadPoolExecutor, а при использовании ProcessPoolExecutor.

Таким образом, когда функция экземпляра класса доставляется в отдельные подпроцессы, механизм многопроцессорной обработки собирает функцию, чтобы функцию можно было передать в подпроцесс как независимый экземпляр. И когда подпроцесс присоединился, класс обновляется экземпляром функции, доставленным обратно как невыбранный.

Чтобы заставить его работать, просто добавьте в класс функции __getstate__() и __setstate__(), чтобы указать классу, как выбирать и распаковывать функцию. При травлении ненужные поля можно исключить, как показано на del self_dict['executor'].

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed


class GuessItWorksNow():

    def __init__(self):
        self.executor = ProcessPoolExecutor(4)

    def __getstate__(self):
        state = self.__dict__.copy()
        del state['executor']
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)

    def something(self, letter, d):
        # do something pointless for the example
        p = multiprocessing.current_process()
        time.sleep(1)
        for x in range(d):
            letter = chr(ord(letter) + 1)
        return (f'[{p.pid}] ({p.name}) ({letter})')

    def execute_something(self, starting_letter):
        futures = [self.executor.submit(self.something, starting_letter, d) for d in range(10)]
        for future in as_completed(futures):
            print(future.result())


if __name__ == '__main__':
    obj = GuessItWorksNow()
    obj.execute_something('A')
person Hoyeung    schedule 27.08.2018