Процесс Python не очищен для повторного использования

Процессы не очищаются для повторного использования

Всем привет,

Я наткнулся на проблему с ProcessPoolExecutor, где процессы получают доступ к данным, у них не должно быть возможности. Позволь мне объяснить:

У меня ситуация, аналогичная приведенному ниже примеру: у меня есть несколько прогонов для запуска с разными аргументами. Они вычисляют свои вещи параллельно, и у них нет причин взаимодействовать друг с другом. Теперь, насколько я понимаю, когда процесс разветвляется, он дублирует себя. Дочерний процесс имеет те же данные (в памяти), что и его родитель, но если он что-то меняет, он делает это в своей собственной копии. Если бы я хотел, чтобы изменения сохранялись в течение всего времени жизни дочернего процесса, я бы вызывал очереди, каналы и другие вещи IPC.

Но я на самом деле нет! Каждый из процессов манипулирует данными для себя, которые не должны переноситься ни в один из других запусков. Однако приведенный ниже пример показывает обратное. Следующие запуски (не параллельные) могут получить доступ к данным своего предыдущего запуска, подразумевая, что данные не были удалены из процесса.

Код/пример

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process, set_start_method

class Static:
    integer: int = 0

def inprocess(run: int) -> None:
    cp = current_process()
    # Print current state
    print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static.integer}", flush=True)

    # Check value
    if Static.integer != 0:
        raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")

    # Update value
    Static.integer = run + 1

def pooling():
    cp = current_process()
    # Get master's pid
    print(f"[{cp.pid} {cp.name}] Start")
    with ProcessPoolExecutor(max_workers=2) as executor:
        for i, _ in enumerate(executor.map(inprocess, range(4))):
            print(f"run #{i} finished", flush=True)

if __name__ == '__main__':
    set_start_method("fork")    # enforce fork
    pooling()

Выход

[1998 MainProcess] Start
[ 0 2020 Process-1] int: 0
[ 2 2020 Process-1] int: 1
[ 1 2021 Process-2] int: 0
[ 3 2021 Process-2] int: 2
run #0 finished
run #1 finished
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in _process_chunk
    return [fn(*args) for args in chunk]
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in <listcomp>
    return [fn(*args) for args in chunk]
  File "<stdin>", line 14, in inprocess
Exception: [ 2 2020 Process-1] Variable already set!
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 29, in <module>
  File "<stdin>", line 24, in pooling
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
Exception: [ 2 2020 Process-1] Variable already set!

Это поведение также можно воспроизвести с помощью max_workers=1, так как процесс используется повторно. Метод запуска не влияет на ошибку (хотя кажется, что только "fork" использует более одного процесса).


Итак, подведем итог: я хочу, чтобы каждый новый запуск в процессе содержал все предыдущие данные, но не новые данные из любого другого запуска. Это возможно? Как бы я этого добился? Почему вышеизложенное не делает именно это?

Я ценю любую помощь.


Я нашел multiprocessing.pool.Pool, где можно установить maxtasksperchild=1, поэтому рабочий процесс уничтожается, когда его задача завершена. Но мне не нравится интерфейс multiprocessing; ProcessPoolExecutor удобнее в использовании. Кроме того, вся идея пула состоит в том, чтобы сэкономить время настройки процесса, которое будет упущено при уничтожении хост-процесса после каждого запуска.


person Dave J    schedule 21.06.2018    source источник


Ответы (2)


Совершенно новые процессы в python не разделяют состояние памяти. Однако ProcessPoolExecutor повторно использует экземпляры процесса. В конце концов, это пул активных процессов. Я предполагаю, что это сделано для предотвращения накладных расходов ОС на постоянное наклонение и запуск процессов.

Вы видите такое же поведение в других технологиях распространения, таких как сельдерей, где, если вы не будете осторожны, вы можете удалить глобальное состояние между выполнениями.

Я рекомендую вам лучше управлять своим пространством имен, чтобы инкапсулировать ваши данные. Используя ваш пример, вы можете, например, инкапсулировать свой код и данные в родительском классе, который вы создаете в inprocess(), вместо того, чтобы хранить его в общем пространстве имен, таком как статическое поле в классах или непосредственно в модуле. Таким образом, объект в конечном итоге будет очищен сборщиком мусора:

class State:
    def __init__(self):
        self.integer: int = 0

    def do_stuff():
        self.integer += 42

def use_global_function(state):
    state.integer -= 1664
    state.do_stuff()

def inprocess(run: int) -> None:
    cp = current_process()
    state = State()
    print(f"[{run:2d} {cp.pid} {cp.name}] int: {state.integer}", flush=True)
    if state.integer != 0:
        raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
    state.integer = run + 1
    state.do_stuff()
    use_global_function(state)
person André C. Andersen    schedule 21.06.2018
comment
Благодарю. относительно повторного использования процесса: я предполагал то же самое. относительно вашего предложения: к сожалению, это не работает для меня. я должен был найти пример без всех фактических накладных расходов. я полагаюсь на метаклассы, чтобы уменьшить мой API, и в результате я не могу создавать экземпляры этих вещей в вызове inprocess. - person Dave J; 21.06.2018

Я сталкивался с некоторыми потенциально похожими проблемами и видел несколько интересных сообщений в этом High Memory Usage. Используя Python Multiprocessing, это указывает на использование gc.collector(), однако в вашем случае это не сработало. Итак, я подумал о том, как был инициализирован класс Static, некоторые моменты:

  1. К сожалению, я не могу воспроизвести ваш минимальный пример, который запрашивает ошибка значения: ValueError: не удается найти контекст для «форка»
  2. Учитывая 1, я использую set_start_method(spawn). Быстрое решение может состоять в том, чтобы инициализировать каждый раз статический класс, как показано ниже:
{
    class Static:
        integer: int = 0
        def __init__(self):
            pass
    
    def inprocess(run: int) -> None:
        cp = current_process()
        # Print current state
        print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static().integer}", flush=True)
    
        # Check value
        if Static().integer != 0:
            raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
    
        # Update value
        Static().integer = run + 1
    
    
    def pooling():
        cp = current_process()
        # Get master's pid
        print(f"[{cp.pid} {cp.name}] Start")
        with ProcessPoolExecutor(max_workers=2) as executor:
            for i, _ in enumerate(executor.map(inprocess, range(4))):
                print(f"run #{i} finished", flush=True)
    
    
    if __name__ == "__main__":
        print("start")
        # set_start_method("fork")  # enforce fork , ValueError: cannot find context for 'fork'
        set_start_method("spawn")    # Alternative
        pooling()
}

Это возвращает:

[ 0 1424 SpawnProcess-2] int: 0
[ 1 1424 SpawnProcess-2] int: 0
run #0 finished
[ 2 17956 SpawnProcess-1] int: 0
[ 3 1424 SpawnProcess-2] int: 0
run #1 finished
run #2 finished
run #3 finished
person DTK    schedule 21.02.2021