multiprocessing pool.map функции вызова в определенном порядке

Как я могу заставить multiprocessing.pool.map распределять процессы в числовом порядке?


Дополнительная информация:
У меня есть программа, которая обрабатывает несколько тысяч файлов данных, составляя график каждого из них. Я использую multiprocessing.pool.map для передачи каждого файла процессору, и он отлично работает. Иногда это занимает много времени, и было бы неплохо смотреть на выходные изображения во время работы программы. Это было бы намного проще, если бы процесс карты распределял снимки по порядку; вместо этого для конкретного прогона, который я только что выполнил, первые 8 проанализированных снимков были: 0, 78, 156, 234, 312, 390, 468, 546. Есть ли способ заставить их распределять их более близко по порядку номеров?


Пример:
Вот пример кода, который содержит те же ключевые элементы и показывает тот же базовый результат:

import sys
from multiprocessing import Pool
import time

num_proc  = 4; num_calls = 20; sleeper   = 0.1

def SomeFunc(arg):
    time.sleep(sleeper)
    print "%5d" % (arg),
    sys.stdout.flush()     # otherwise doesn't print properly on single line

proc_pool = Pool(num_proc)
proc_pool.map( SomeFunc, range(num_calls) )

Урожайность:

   0  4  2  6   1   5   3   7   8  10  12  14  13  11   9  15  16  18  17  19

Отвечать:

От @Hayden: используйте параметр chunksize, def map(self, func, iterable, chunksize=None).

Дополнительная информация:
chunksize определяет, сколько итераций выделяется каждому процессору за раз. В моем примере выше, например, используется размер фрагмента 2 --- что означает, что каждый процессор отключается и выполняет свою задачу в течение 2 итераций функции, а затем возвращается для большего количества («проверка»). Компромисс размера chunksize заключается в том, что существуют накладные расходы на «регистрацию», когда процессор должен синхронизироваться с остальными, что говорит о том, что вам нужен большой размер chunksize. С другой стороны, если у вас есть большие фрагменты, то один процессор может завершить свой фрагмент, в то время как другому остается много времени, поэтому вам следует использовать небольшой размер фрагмента. Я предполагаю, что дополнительная полезная информация - это то, какой диапазон есть, сколько времени может занять каждый вызов функции. Если они действительно должны занимать одинаковое количество времени - гораздо эффективнее использовать большой размер блока. С другой стороны, если вызовы некоторых функций могут занимать в два раза больше времени, чем другие, вам нужен небольшой размер фрагментов, чтобы процессоры не застали в ожидании.

Для моей проблемы каждый вызов функции должен занимать примерно одинаковое количество времени (я думаю), поэтому, если я хочу, чтобы процессы вызывались по порядку, я собираюсь пожертвовать эффективностью из-за накладных расходов на регистрацию.


person DilithiumMatrix    schedule 27.07.2013    source источник
comment
Что вы используете для построения данных?   -  person satoru    schedule 28.07.2013
comment
@ Satoru.Logic Я не понимаю, насколько это актуально, но изнутри SomeFunc я бы вызвал другую функцию, например PlotFunc(), который создает изображение с matplotlib и pyplot и сохраняет его на диск.   -  person DilithiumMatrix    schedule 28.07.2013
comment
Можно ли выполнить предварительную обработку параллельно, а затем последовательно построить график?   -  person satoru    schedule 28.07.2013
comment
Ах, я понимаю - нет, большая часть вычислительного времени уходит только на чтение файлов, поэтому мне пришлось бы дождаться завершения 90% общего времени, прежде чем что-либо строить.   -  person DilithiumMatrix    schedule 28.07.2013


Ответы (2)


Причина, по которой это происходит, заключается в том, что каждому процессу дается предопределенный объем работы, который нужно выполнить в начале вызова карты, который зависит от chunksize. Мы можем определить chunksize по умолчанию, посмотрев на источник pool.map

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
  chunksize += 1

Таким образом, для диапазона 20 и 4 процессов мы получим chunksize из 2.

Если мы изменим ваш код, чтобы отразить это, мы должны получить результаты, аналогичные результатам, которые вы получаете сейчас:

proc_pool.map(SomeFunc, range(num_calls), chunksize=2)

Это дает результат:

0 2 6 4 1 7 5 3 8 10 12 14 9 13 15 11 16 18 17 19

Теперь установка chunksize=1 гарантирует, что каждому процессу в пуле будет выдаваться только одна задача за раз.

proc_pool.map(SomeFunc, range(num_calls), chunksize=1)

Это должно обеспечить достаточно хороший числовой порядок по сравнению с тем, когда не указывается размер блока. Например, размер фрагмента 1 дает результат:

0 1 2 3 4 5 6 7 9 10 8 11 13 12 15 14 16 17 19 18

person Hayden    schedule 28.07.2013
comment
Потрясающий! Спасибо. Есть ли у вас какое-нибудь представление о том, почему они выбрали именно этот алгоритм? Или вообще какие факторы следует учитывать при оптимизации производительности? (Не похоже, что в моем случае chunksize каким-либо образом повлияет на производительность) - person DilithiumMatrix; 28.07.2013
comment
Могу я задать вопрос? Установив chuncksize = 1, будет ли этот алгоритм одновременного запуска функции pool.map? или будет мало рабочих, но они будут делать дела по порядку? Я не совсем понимаю блок в документации .. - person Oldyoung; 04.08.2016

А как насчет изменения map на imap:

import os
from multiprocessing import Pool
import time

num_proc = 4
num_calls = 20
sleeper = 0.1

def SomeFunc(arg):
    time.sleep(sleeper)
    print "%s %5d" % (os.getpid(), arg)
    return arg

proc_pool = Pool(num_proc)
list(proc_pool.imap(SomeFunc, range(num_calls)))

Причина может быть в том, что chunksize для imap по умолчанию равно 1, поэтому он не может работать до map.

person satoru    schedule 28.07.2013