Объедините Pool.map с массивом разделяемой памяти в многопроцессорной обработке Python

У меня есть очень большой (только для чтения) массив данных, который я хочу обрабатывать несколькими процессами параллельно.

Мне нравится функция Pool.map, и я хотел бы использовать ее для параллельного вычисления функций с этими данными.

Я видел, что можно использовать класс Value или Array для использования данных общей памяти между процессами. Но когда я пытаюсь использовать это, я получаю RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance при использовании функции Pool.map:

Вот упрощенный пример того, что я пытаюсь сделать:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

Может ли кто-нибудь сказать мне, что я здесь делаю не так?

Итак, я хотел бы передать информацию о вновь созданном массиве с распределенной общей памятью процессам после того, как они были созданы в пуле процессов.


person Jeroen Dirks    schedule 04.11.2009    source источник
comment
К сожалению, это невозможно. Согласно документации mp рекомендуется использовать наследование (на форк-платформах). Для данных только для чтения, как здесь, обычно используется глобальный массив, но можно использовать общий массив для обмена данными для чтения / записи. Форкинг обходится недорого, поэтому вы можете воссоздавать пул всякий раз, когда получаете данные, а затем закрывать его. К сожалению, в Windows это невозможно - обходной путь заключается в использовании массива общей памяти (даже в случае только для чтения), но это может быть передано подпроцессам только при создании процесса (я полагаю, их нужно добавить в список доступа. ...   -  person robince    schedule 13.11.2009
comment
для сегмента разделяемой памяти и что эта логика реализуется только при запуске подпроцесса). Вы можете передать общий массив данных при запуске пула, как я показал, или в процесс аналогичным образом. Вы не можете передать массив разделяемой памяти в открытый пул - вы должны создать пул после памяти. Простые способы решения этой проблемы включают выделение буфера максимального размера или просто выделение массива, когда вы знаете требуемый размер перед запуском пула. Если вы держите свои глобальные переменные вниз, пул не должен быть слишком дорогим и для Windows - глобальные переменные автоматически ...   -  person robince    schedule 13.11.2009
comment
маринованные и отправленные в подпроцессы - вот почему я предлагаю сделать один буфер достаточного размера в начале (где, надеюсь, ваше количество глобальных переменных мало), а затем Pool, лучше. Я нашел время, чтобы понять и добросовестно решить вашу проблему - до того, как вы отредактировали свой вопрос - поэтому, хотя я понимаю, если вы хотите, чтобы он работал, я надеюсь, что в конце вы подумаете о том, чтобы принять мой ответ, если не будет ничего существенно другого / лучшего вместе.   -  person robince    schedule 13.11.2009
comment
Я более внимательно изучил исходный код, и информацию об общей памяти можно обработать (это необходимо для передачи информации об этом клиентскому процессу в Windows), но в этом коде есть утверждение, которое запускается только во время порождения процесса. Интересно, почему это так.   -  person Jeroen Dirks    schedule 13.11.2009


Ответы (4)


Пытаюсь снова, потому что я только что увидел награду;)

В основном я думаю, что сообщение об ошибке означает то, о чем говорилось - многопроцессорные массивы разделяемой памяти не могут быть переданы в качестве аргументов (путем травления). Нет смысла сериализовать данные - дело в том, что данные являются разделяемой памятью. Итак, вам нужно сделать общий массив глобальным. Я думаю, что лучше использовать его как атрибут модуля, как в моем первом ответе, но просто оставить его как глобальную переменную в вашем примере также хорошо. Принимая во внимание вашу точку зрения о нежелании устанавливать данные перед форком, вот модифицированный пример. Если вы хотите иметь более одного возможного общего массива (и именно поэтому вы хотели передать toShare в качестве аргумента), вы могли бы аналогичным образом создать глобальный список общих массивов и просто передать индекс в count_it (который стал бы for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[РЕДАКТИРОВАТЬ: Вышеупомянутое не работает в Windows из-за того, что не используется fork. Однако приведенное ниже работает в Windows, все еще используя Pool, поэтому я думаю, что это наиболее близко к тому, что вы хотите:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

Не уверен, почему карта не обрабатывает массив, а процесс и пул - я думаю, возможно, он был передан в точке инициализации подпроцесса в Windows. Обратите внимание, что данные все еще устанавливаются после вилки.

person robince    schedule 12.11.2009
comment
Даже на платформах с вилкой вы не можете вставлять новые общие данные в toShare после вилки, поскольку в этот момент каждый процесс будет иметь свою собственную независимую копию. - person Jeroen Dirks; 12.11.2009
comment
Итак, настоящая проблема, похоже, заключается в том, как мы можем собрать информацию о массиве, чтобы ее можно было отправлять и подключать из другого процесса. - person Jeroen Dirks; 12.11.2009
comment
@ Джеймс - нет, это неправильно. Массив должен быть настроен перед форком, но тогда это общая память, которую можно изменить, и изменения будут видны для всех дочерних элементов. Посмотрите на пример - я помещаю данные в массив после вилки (которая возникает при создании экземпляра Pool ()). Эти данные могут быть получены во время выполнения, после разветвления, и до тех пор, пока они помещаются в предварительно выделенный сегмент разделяемой памяти, они могут быть скопированы туда и просмотрены всеми дочерними элементами. - person robince; 12.11.2009
comment
Вы можете мариновать массив, но не используя пул. - person jwilson; 12.11.2009
comment
Отредактировано, чтобы добавить рабочую версию Windows, используя только Pool (путем передачи общего массива в качестве параметра инициализации. - person robince; 12.11.2009
comment
Вы приближаетесь, но все еще остается проблема, заключающаяся в том, что длину массива toShare необходимо исправить до создания пула. Таким образом, вы все еще создаете сегмент разделяемой памяти до создания процессов. Что я действительно хочу видеть в качестве общего решения, так это способ создания нового общего массива переменной длины после создания пула, передачи информации о нем рабочему процессу и чтения из него. - person Jeroen Dirks; 12.11.2009
comment
Боюсь, что с Pool такое невозможно. Вы должны заранее создать общую память. - person robince; 12.11.2009
comment
В любом случае это кажется искусственным требованием. Если размер нового набора данных не соответствует размеру текущего общего буфера - вы можете просто закрыть пул (pool.close()), создать новый общий массив необходимого размера и открыть новый пул. Для любых вычислительных задач, где использование многопроцессорности того стоит, накладные расходы на закрытие и открытие пула будут крошечными. А операции с пулом относительно атомарны, так что нельзя вводить свежие данные в середине команды карты. - person robince; 12.11.2009
comment
Утверждение о травлении общего массива данных кажется искусственным ограничением использования общего ресурса с множественной обработкой, но с учетом этого ограничения вы предоставили некоторые разумные обходные пути, поэтому я дам вам баллы за принятый ответ. - person Jeroen Dirks; 13.11.2009

Если данные доступны только для чтения, просто сделайте их переменной в модуле перед вилкой из Pool. Тогда все дочерние процессы должны иметь к нему доступ, и он не будет скопирован, если вы не напишете ему.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

Если вы действительно хотите попробовать использовать Array, вы можете попробовать с аргументом ключевого слова lock=False (по умолчанию это правда).

person robince    schedule 04.11.2009
comment
Я не верю, что использование глобальных переменных безопасно и определенно не будет работать в окнах, где процессы не разветвляются. - person Jeroen Dirks; 04.11.2009
comment
Как это не безопасно? Если вам нужен только доступ для чтения к данным, это нормально. Если вы напишете на него по ошибке, то измененная страница будет скопирована при записи для дочернего процесса, поэтому ничего плохого не произойдет (например, не будет мешать другим процессам). Вы правы, это не сработает с окнами ... - person robince; 04.11.2009
comment
Вы правы, что это безопасно на платформах, основанных на форке. Но я хотел бы знать, существует ли способ обмена большими объемами данных на основе общей памяти после создания пула процессов. - person Jeroen Dirks; 05.11.2009

Проблема, которую я вижу, заключается в том, что Pool не поддерживает сбор общих данных через свой список аргументов. Это то, что сообщение об ошибке означает, что «объекты должны совместно использоваться между процессами только посредством наследования». Общие данные должны быть унаследованными, то есть глобальными, если вы хотите поделиться ими с помощью класса Pool.

Если вам нужно передать их явно, вам, возможно, придется использовать multiprocessing.Process. Вот ваш переработанный пример:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Вывод: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

Порядок элементов очереди может отличаться.

Чтобы сделать это более общим и похожим на Pool, вы можете создать фиксированное количество процессов N, разделить список ключей на N частей, а затем использовать функцию-оболочку в качестве цели процесса, которая будет вызывать count_it для каждого ключа в списке. он передается, например:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
person jwilson    schedule 10.11.2009

Если вы видите:

RuntimeError: синхронизированные объекты должны совместно использоваться между процессами только через наследование.

Рассмотрите возможность использования multiprocessing.Manager, поскольку это не так. есть это ограничение. Менеджер работает, считая, что он предположительно работает в отдельном процессе.

import ctypes
import multiprocessing

# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1
person Acumenus    schedule 02.10.2019
comment
это было единственное предложение, которое я действительно получил при использовании multiprocessing.Pool ... и мне не требовалось явное обращение с manager.Lock - person raphael; 25.03.2020
comment
@raphael Вы утверждаете, что значение имеет неявную блокировку? Явная блокировка предназначена для предотвращения состояния гонки и, таким образом, предотвращения ошибочного подсчета при обновлении счетчика из нескольких процессов. - person Acumenus; 30.10.2020