заполнить массив numpy с помощью многопроцессорной обработки concurrent.futures

Я пытаюсь заполнить большой массив numpy, используя многопроцессорность. Я проработал параллельные примеры фьючерсов в документации, но не получил достаточного понимания, чтобы изменить использование.

Вот упрощенная версия того, что я хотел бы сделать:

import numpy
import concurrent.futures

squares = numpy.empty((20, 2))

def make_square(i, squares):
    print('iteration', i)
    squares[i, 0], squares[i, 1] = i, i ** 2

with concurrent.futures.ProcessPoolExecutor(2) as executor: 
    for i in range(20):
        executor.submit(make_square, i, squares)

Результат выглядит примерно так:

iteration 1
iteration 0
iteration 2
iteration 3
iteration 5
iteration 4
iteration 6
iteration 7
iteration 8
iteration 9
iteration 10
iteration 11
iteration 12
iteration 13
iteration 15
iteration 14
iteration 16
iteration 17
iteration 18
iteration 19

что наглядно демонстрирует, что функция выполняется одновременно. Но массив квадратов все еще пуст.

Каков правильный синтаксис для заполнения массива квадратов?

Во-вторых, будет ли использование .map лучшей реализацией?

Заранее спасибо!

02.08.17 Вау. Так что я забрел на Reddit-Land, потому что не собирался брать людей для решения этой проблемы. Так счастлив вернуться сюда, на stackoverflow. Спасибо @ilia w495 nikitin и @donkopotamus. Вот что я опубликовал в Reddit, в котором более подробно объясняется предыстория этой проблемы.

The posted code is an analogy of what I'm trying to do, which is populating 
a numpy array with a relatively simple calculation (dot product) involving 
two other arrays. The algorithm depends on a value N which can be anything 
from 1 on up, though we won't likely use a value larger than 24.

I'm currently running the algorithm on a distributed computing system and  
the N = 20 versions take longer than 10 days to complete. I'm using dozens 
of cores to obtain the required memory, but gaining none of the benefits of 
multiple CPUs. I've rewritten the code using numba which makes lower N 
variants superfast on my own laptop which can't handle the memory 
requirements for larger Ns, but alas, our distributed computing environment 
is not currently able to install numba. So I'm attempting concurrent.futures 
to take advantage of the multiple CPUs in our computing environment in the 
hopes of speeding things up.

Так что времени требуют не вычисления, а более 16 миллионов итераций. Инициализированный массив равен N x 2 ** N, то есть диапазон (16777216) в приведенном выше коде.

Может быть, просто невозможно заполнить массив посредством многопроцессорной обработки.


person zazizoma    schedule 11.07.2016    source источник
comment
Ваш массив квадратов пуст, потому что вы пытаетесь изменить его в отдельных процессах.   -  person donkopotamus    schedule 02.08.2016
comment
@zazizoma Не заполнять, а инициализировать. Есть еще одна парадигма. общие структуры данных должны быть неизменными. Я думаю, вам следует разделить свой массив на C части, где C - количество процессоров, и обрабатывать каждую часть на отдельном процессоре (процессе). Затем соедините все части, и вы получите то, что хотите. Но в некоторых случаях это неприменимо. Это зависит от вашего алгоритма. Кроме того, за отправку данных между процессами взимается отдельная плата. Например, я попробую реализовать с помощью pymp: gist.github.com/w495/6d3cd6a715e3098a С concurrent.futures будет проще.   -  person Ilia w495 Nikitin    schedule 03.08.2016
comment
Большой. Я также рассмотрю запуск скалярного произведения на нескольких процессорах и оставлю итерации линейными. Это может помочь. ДЕЙСТВИТЕЛЬНО ценю руководство.   -  person zazizoma    schedule 03.08.2016
comment
@zazizoma Это еще одна часть моего проекта, где я реализовал разделение с помощью concurrent.futures.ProcessPoolExecutor: gist.github.com / w495 / 82f7b21509a69a0d70e18f2e4ddf5ed9 Я полагаю, это тоже может вам помочь.   -  person Ilia w495 Nikitin    schedule 04.08.2016


Ответы (2)


Проблема здесь в том, что ProcessPoolExecutor будет выполнять функцию в рамках отдельного процесса.

Поскольку это отдельные процессы с отдельным пространством памяти, вы не можете ожидать, что любые изменения, которые они вносят в ваш массив (squares), будут отражены в родительском. Следовательно, ваш исходный массив не изменился (как вы обнаруживаете).

Вам необходимо выполнить одно из следующих действий:

  • используйте ThreadPoolExecutor, но имейте в виду, что в общем случае вы все равно не должны пытаться изменять глобальные переменные в нескольких потоках;
  • переделайте свой код, чтобы ваш процесс / поток выполнял какие-то (дорогостоящие) вычисления и возвращал результат.

Последний будет выглядеть так:

squares = numpy.zeros((20, 2))

def make_square(i):
    print('iteration', i)

    # compute expensive data here ...

    # return row number and the computed data
    return i, ([i, i**2])

with concurrent.futures.ProcessPoolExecutor(2) as executor: 
    for row, result in executor.map(make_square, range(20)):
        squares[row] = result

Это даст ожидаемый результат:

[[   0.    0.]
 [   1.    1.]
 [   2.    4.]
 ...
 [  18.  324.]
 [  19.  361.]]
person donkopotamus    schedule 02.08.2016
comment
Но какова причина повторного использования этой (квадратов) переменной? Это не гарантирует более низкого использования памяти. Более того, повторная отправка ненужных данных через процессы - не лучшая идея. В этом случае вы можете получить row через enumerate. - person Ilia w495 Nikitin; 02.08.2016

А хороший пример, я полагаю, это вам поможет:

from concurrent.futures import ProcessPoolExecutor
from time import sleep

def return_after_5_secs(message):
    sleep(5)
    return message

pool = ProcessPoolExecutor(3)

future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
print("Result: " + future.result())

Будущее - это только обещание что-то сделать. Итак, я вижу ваш код следующим образом:

import concurrent.futures
import itertools
import os
import time

import numpy

SQUARE_LIST_SIZE = 20


def main():
    # Creates empty array.
    square_list = numpy.empty((SQUARE_LIST_SIZE, 2))

    # Creates a sequence (generator) of promises
    future_seq = make_future_seq(square_list)

    # Creates a sequence (generator) of computed square.
    square_seq = make_square_seq(future_seq)

    # Creates a sequence (generator) of computed square.
    square_list = list(square_seq)

    return square_list


def make_future_seq(squares):
    """
        Generates the sequence of empty a promises.
        Creates a new process only on `submit`.
    """

    with concurrent.futures.ProcessPoolExecutor(4) as executor:
        for i in range(SQUARE_LIST_SIZE):
            # Only makes a promise to do something.
            future = executor.submit(make_one_square, i, squares)
            print('future ', i, '= >', future)
            yield future


def make_square_seq(future_seq):
    """
        Generates the sequence of fulfilled a promises.
    """

    # Just to copy iterator
    for_show_1, for_show_2, future_seq = itertools.tee(future_seq, 3)

    # Let's check it, May be it withdrawn =)
    for i, future in enumerate(for_show_1):
        print('future ', i, 'done [1] =>', future.done())

    # Try to keep its promises
    for future in future_seq:
        yield future.result()

    # Let's check it one more time. It is faithful to!
    for i, future in enumerate(for_show_2):
        print('future ', i, 'done [2] =>', future.done())

    return future_seq


def make_one_square(i, squares):
    print('inside [1] = >', i, 'pid = ', os.getpid())
    squares[i, 0], squares[i, 1] = i, i ** 2

    time.sleep(1)  # Long and hard computation.

    print('inside [2]= >', i, 'pid = ', os.getpid())
    return squares


if __name__ == '__main__':
    main()

Слишком много букв. Это просто для объяснения. Это зависит от обстоятельств, но для многих реальных примеров требуется только future.result() вызов. Проверьте эту страницу: concurrent.futures.html

Итак, этот код сгенерирует что-то вроде этого:

$ python test_futures_1.py 
future  0 = > <Future at 0x7fc0dc758278 state=running>
future  0 done [1] => False
future  1 = > <Future at 0x7fc0dc758da0 state=pending>
inside [1] = > 0 pid =  19364
future  1 done [1] => False
inside [1] = > 1 pid =  19365
future  2 = > <Future at 0x7fc0dc758e10 state=pending>
future  2 done [1] => False
future  3 = > <Future at 0x7fc0dc758cc0 state=pending>
inside [1] = > 2 pid =  19366
future  3 done [1] => False
future  4 = > <Future at 0x7fc0dc769048 state=pending>
future  4 done [1] => False
inside [1] = > 3 pid =  19367
future  5 = > <Future at 0x7fc0dc758f60 state=running>
future  5 done [1] => False
future  6 = > <Future at 0x7fc0dc758fd0 state=pending>
future  6 done [1] => False
future  7 = > <Future at 0x7fc0dc7691d0 state=pending>
future  7 done [1] => False
future  8 = > <Future at 0x7fc0dc769198 state=pending>
future  8 done [1] => False
future  9 = > <Future at 0x7fc0dc7690f0 state=pending>
future  9 done [1] => False
future  10 = > <Future at 0x7fc0dc769438 state=pending>
future  10 done [1] => False
future  11 = > <Future at 0x7fc0dc7694a8 state=pending>
future  11 done [1] => False
future  12 = > <Future at 0x7fc0dc769550 state=pending>
future  12 done [1] => False
future  13 = > <Future at 0x7fc0dc7695f8 state=pending>
future  13 done [1] => False
future  14 = > <Future at 0x7fc0dc7696a0 state=pending>
future  14 done [1] => False
future  15 = > <Future at 0x7fc0dc769748 state=pending>
future  15 done [1] => False
future  16 = > <Future at 0x7fc0dc7697f0 state=pending>
future  16 done [1] => False
future  17 = > <Future at 0x7fc0dc769898 state=pending>
future  17 done [1] => False
future  18 = > <Future at 0x7fc0dc769940 state=pending>
future  18 done [1] => False
future  19 = > <Future at 0x7fc0dc7699e8 state=pending>
future  19 done [1] => False
inside [2]= > 0 pid =  19364
inside [2]= > 1 pid =  19365
inside [1] = > 4 pid =  19364
inside [2]= > 2 pid =  19366
inside [1] = > 5 pid =  19365
inside [1] = > 6 pid =  19366
inside [2]= > 3 pid =  19367
inside [1] = > 7 pid =  19367
inside [2]= > 4 pid =  19364
inside [2]= > 5 pid =  19365
inside [2]= > 6 pid =  19366
inside [1] = > 8 pid =  19364
inside [1] = > 9 pid =  19365
inside [1] = > 10 pid =  19366
inside [2]= > 7 pid =  19367
inside [1] = > 11 pid =  19367
inside [2]= > 8 pid =  19364
inside [2]= > 9 pid =  19365
inside [2]= > 10 pid =  19366
inside [2]= > 11 pid =  19367
inside [1] = > 13 pid =  19366
inside [1] = > 12 pid =  19364
inside [1] = > 14 pid =  19365
inside [1] = > 15 pid =  19367
inside [2]= > 14 pid =  19365
inside [2]= > 13 pid =  19366
inside [2]= > 12 pid =  19364
inside [2]= > 15 pid =  19367
inside [1] = > 16 pid =  19365
inside [1] = > 17 pid =  19364
inside [1] = > 18 pid =  19367
inside [1] = > 19 pid =  19366
inside [2]= > 16 pid =  19365
inside [2]= > 18 pid =  19367
inside [2]= > 17 pid =  19364
inside [2]= > 19 pid =  19366
future  0 done [2] => True
future  1 done [2] => True
future  2 done [2] => True
future  3 done [2] => True
future  4 done [2] => True
future  5 done [2] => True
future  6 done [2] => True
future  7 done [2] => True
future  8 done [2] => True
future  9 done [2] => True
future  10 done [2] => True
future  11 done [2] => True
future  12 done [2] => True
future  13 done [2] => True
future  14 done [2] => True
future  15 done [2] => True
future  16 done [2] => True
future  17 done [2] => True
future  18 done [2] => True
future  19 done [2] => True
person Ilia w495 Nikitin    schedule 02.08.2016
comment
Да, но, как я понимаю, проблема не в инициализации squares. Полагаю, настоящая проблема заключается в понимании того, как работать с PoolExecutor. - person Ilia w495 Nikitin; 02.08.2016