Используйте массив numpy в общей памяти для многопроцессорности

Я хотел бы использовать массив numpy в общей памяти для использования с модулем многопроцессорности. Сложность заключается в использовании его как массива numpy, а не только как массива ctypes.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

Это дает такой результат, как:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

Доступ к массиву можно получить с помощью ctypes, например arr[i] имеет смысл. Однако это не массив numpy, и я не могу выполнять такие операции, как -1*arr или arr.sum(). Я полагаю, что решением было бы преобразовать массив ctypes в массив numpy. Однако (помимо того, что я не могу выполнить эту работу), я не верю, что ею больше будут делиться.

Кажется, было бы стандартное решение того, что должно быть общей проблемой.


person Ian Langmore    schedule 25.10.2011    source источник
comment
Это не то же самое, что этот? stackoverflow.com/questions/5033799/   -  person pygabriel    schedule 26.10.2011
comment
Это не совсем тот же вопрос. Связанный вопрос касается subprocess, а не multiprocessing.   -  person Andrew    schedule 21.01.2013


Ответы (5)


Чтобы добавить в ответы @ unutbu (больше не доступно) и @Henry Gomersall. Вы можете использовать shared_arr.get_lock() для синхронизации доступа, когда это необходимо:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Пример

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

Если вам не нужен синхронизированный доступ или вы создаете свои собственные блокировки, mp.Array() не требуется. В этом случае вы можете использовать mp.sharedctypes.RawArray.

person jfs    schedule 26.10.2011
comment
Прекрасный ответ! Если я хочу иметь более одного разделяемого массива, каждый отдельно блокируемый, но с количеством массивов, определенным во время выполнения, является ли это прямым расширением того, что вы здесь сделали? - person Andrew; 19.01.2013
comment
@Andrew: общие массивы должны быть созданы до порождения дочерних процессов. - person jfs; 19.01.2013
comment
Хорошее замечание о порядке операций. Это то, что я имел в виду: создать указанное пользователем количество общих массивов, а затем создать несколько дочерних процессов. Это просто? - person Andrew; 19.01.2013
comment
Я задал еще один вопрос, чтобы разобраться с этой деталью: stackoverflow.com/q/14416130/513688 - person Andrew; 19.01.2013
comment
Я бы также посмотрел на режимы без копирования ZMQ. - person meawoppl; 09.08.2014
comment
@ J.F.Sebastian. Если я по какой-то причине не могу узнать размер общего массива до того, как будут созданы дочерние процессы, что тогда будет хорошим обходным решением? Я подумываю создать общий массив с размером 1, а затем отправить сообщение канала одному дочернему процессу и расширить массив до желаемого размера. Трубы создаются независимо от других операций, для которых они мне нужны. Это хороший метод? - person Chicony; 08.12.2016
comment
@Chicony: вы не можете изменить размер массива. Думайте об этом как об общем блоке памяти, который должен был быть выделен до запуска дочерних процессов. Вам не нужно использовать всю память, например, вы можете передать count в numpy.frombuffer(). Вы можете попробовать сделать это на более низком уровне, используя mmap или что-то вроде posix_ipc напрямую, чтобы реализовать изменяемый размер (может включать копирование при изменении размера) аналог RawArray (или искать существующую библиотеку). Или, если ваша задача позволяет: копировать данные по частям (если вам не нужно все сразу). Как изменить размер разделяемой памяти - хороший отдельный вопрос. - person jfs; 08.12.2016
comment
@ J.F.Sebastian Я начал отдельный вопрос, stackoverflow.com/q/41037808/4759898 - person Chicony; 08.12.2016
comment
@jfs Я хочу поделиться множеством случайных состояний родительского процесса с дочерним процессом. Я пробовал использовать Manager, но все равно не повезло. Не могли бы вы взглянуть на мой вопрос здесь и посмотрите, сможете ли вы предложить решение? Я все еще могу получать разные случайные числа, если я делаю np.random.seed(None) каждый раз, когда генерирую случайное число, но это не позволяет мне использовать случайное состояние родительского процесса, чего я не хочу. Любая помощь приветствуется. - person Amir; 20.03.2018
comment
довольно простой вопрос здесь, но произвольно ли выбрано M? Является ли M количеством процессов, которые будут выполняться параллельно? - person umop apisdn; 16.07.2018
comment
@umopapisdn: Pool() определяет количество процессов (по умолчанию используется количество доступных ядер ЦП). M - количество вызовов f() функции. - person jfs; 16.07.2018
comment
TL ; DR можно найти здесь - person Tobia Tesan; 31.10.2018
comment
Эти несколько строк shared_arr = mp.Array(ctypes.c_double, N); arr = tonumpyarray(shared_arr); arr[:] = np.random.uniform(size=N) полностью решают мою проблему (это работает в python 3.7.5) - person mathguy; 14.11.2019

Объект Array имеет связанный с ним метод get_obj(), который возвращает массив ctypes, представляющий интерфейс буфера. Я думаю, что следующее должно работать ...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

При запуске выводится первый элемент a, который теперь равен 10.0, показывая, что a и b - это всего лишь два представления в одной и той же памяти.

Я считаю, что для того, чтобы убедиться, что он по-прежнему многопроцессорный, вам придется использовать методы acquire и release, которые существуют для объекта Array, a, и его встроенную блокировку, чтобы обеспечить безопасный доступ ко всему этому (хотя я не специалист по многопроцессорному модулю).

person Henry Gomersall    schedule 26.10.2011
comment
он не будет работать без синхронизации, как продемонстрировал @unutbu в своем (теперь удаленном) ответе. - person jfs; 27.10.2011
comment
Предположительно, если вы просто хотите получить доступ к постобработке массива, это можно сделать чисто, не беспокоясь о проблемах параллелизма и блокировках? - person Henry Gomersall; 27.10.2011
comment
в этом случае вам не нужно mp.Array. - person jfs; 27.10.2011
comment
Код обработки может потребовать заблокированных массивов, но интерпретация данных после обработки может не обязательно. Думаю, это происходит от понимания, в чем именно проблема. Ясно, что одновременный доступ к общим данным потребует некоторой защиты, что, как я думал, было очевидным! - person Henry Gomersall; 27.10.2011

Хотя уже даны хорошие ответы, существует гораздо более простое решение этой проблемы при соблюдении двух условий:

  1. Вы работаете в POSIX-совместимой операционной системе (например, Linux, Mac OSX); а также
  2. Вашим дочерним процессам требуется доступ только для чтения к общему массиву.

В этом случае вам не нужно возиться с явным созданием общих переменных, так как дочерние процессы будут созданы с помощью вилки. Разветвленный дочерний элемент автоматически разделяет пространство памяти родителя. В контексте многопроцессорной обработки Python это означает, что он разделяет все переменные уровня модуля; обратите внимание, что это не выполняется для аргументов, которые вы явно передаете своим дочерним процессам или функциям, которые вы вызываете для multiprocessing.Pool или около того.

Простой пример:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
person EelkeSpaak    schedule 10.06.2016
comment
+1 Действительно ценная информация. Можете ли вы объяснить, почему используются только переменные уровня модуля? Почему локальные вары не являются частью родительского пространства памяти? Например, почему это не может работать, если у меня есть функция F с локальной переменной V и функция G внутри F, которая ссылается на V? - person Coffee_Table; 27.10.2017
comment
Предупреждение: этот ответ немного обманчив. Дочерний процесс получает копию состояния родительского процесса, включая глобальные переменные, во время вилки. Состояния никоим образом не синхронизированы и с этого момента будут расходиться. Этот метод может быть полезен в некоторых сценариях (например: разветвление специальных дочерних процессов, каждый из которых обрабатывает моментальный снимок родительского процесса, а затем завершается), но бесполезен в других (например: длительные дочерние процессы, которые должны совместно использовать и синхронизировать данные с родительским процессом). - person David Stein; 07.04.2018
comment
@DavidStein: Да, я думаю, что я прямо упомянул в ответе (пункт 2), что синхронизация данных с родительским процессом (что требует большего, чем доступ только для чтения), помешает этой технике работать должным образом. В случаях, когда дочерние процессы выполняют более одной задачи и т. Д., Лучше использовать явную разделяемую память и не полагаться на семантику вилки. Однако во многих случаях трюк с разветвлением работает очень хорошо для меня (например, каждый дочерний процесс выполняет некоторые дорогостоящие вычисления на одном срезе матрицы, присутствующей в родительской памяти). - person EelkeSpaak; 08.04.2018
comment
@EelkeSpaak: Ваше утверждение - разветвленный дочерний элемент автоматически разделяет пространство памяти родителя - неверно. Если у меня есть дочерний процесс, который хочет отслеживать состояние родительского процесса строго в режиме «только для чтения», разветвление не приведет меня туда: дочерний процесс видит только моментальный снимок родительского состояния в момент разветвления. Фактически, это именно то, что я пытался сделать (следуя вашему ответу), когда обнаружил это ограничение. Отсюда постскриптум к вашему ответу. В двух словах: родительское состояние не используется совместно, а просто копируется в дочернее состояние. Это не разделение в обычном смысле. - person David Stein; 09.04.2018
comment
Я ошибаюсь, думая, что это ситуация копирования при записи, по крайней мере, в системах posix? То есть после вилки я думаю, что память используется совместно до тех пор, пока не будут записаны новые данные, после чего создается копия. Так что да, это правда, что данные не передаются точно, но это может обеспечить потенциально огромный прирост производительности. Если ваш процесс доступен только для чтения, накладных расходов на копирование не будет! Правильно ли я понял суть? - person senderle; 31.10.2018
comment
@senderle Да, это именно то, что я имел в виду! Отсюда мой пункт (2) в ответе о доступе только для чтения. - person EelkeSpaak; 01.11.2018
comment
Обратите внимание, что это работает только с multiprocessing.set_start_method('fork') (в настоящее время по умолчанию в Unix). Бывают ситуации, когда вы предпочитаете использовать spawn вместо fork (например, если вы должны использовать некоторую библиотеку с ошибками, которая плохо себя ведет после вилки). - person Albert; 18.05.2019

Я написал небольшой модуль python, который использует разделяемую память POSIX для совместного использования массивов numpy между интерпретаторами python. Может, тебе это пригодится.

https://pypi.python.org/pypi/SharedArray

Вот как это работает:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
person mat    schedule 22.10.2015

Вы можете использовать модуль sharedmem: https://bitbucket.org/cleemesser/numpy-sharedmem

Вот ваш исходный код, на этот раз с использованием общей памяти, которая ведет себя как массив NumPy (обратите внимание на дополнительный последний оператор, вызывающий функцию NumPy sum()):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()
person Velimir Mlaker    schedule 25.05.2013
comment
Примечание: это больше не разрабатывается и, похоже, не работает в Linux github.com/ sturlamolden / sharedmem-numpy / issues / 4 - person A.D; 08.03.2016