Недавно я начал экспериментировать с многопроцессорностью, чтобы ускорить задачу. Я создал скрипт, который выполняет нечеткое сопоставление строк и вычисляет баллы с использованием разных алгоритмов (я хотел сравнить разные методы сопоставления). Полный исходный код можно найти здесь: https://bitbucket.org/bergonzzi/fuzzy-compare/src. На вход подаются 2 файла, которые объединены в пары (каждая строка файла1 с каждой строкой файла2). Для каждой пары вычисляются нечеткие оценки соответствия.
Я сделал 3 версии. Работая с образцами данных, представленными в моем репозитории (который состоит из 697 340 элементов после объединения в пары), у меня есть следующие тайминги:
- Простой одиночный процесс - 0:00:47
- Мультипроцесс с использованием Pool.map() — 0:00:13
- Многопроцессорность с использованием очередей (шаблон производитель/потребитель) — 0:01:04
Я пытаюсь понять, почему моя версия Pool.map() намного быстрее, чем моя версия Queue, которая на самом деле медленнее, чем простая версия с одним процессом.
Моя причина даже попытки использовать очереди заключается в том, что версия Pool.map() удерживает результаты до тех пор, пока все не будет завершено, и записывает в файл только в конце. Это означает, что для больших файлов требуется много памяти. я говорю об этой версии (ссылка на нее, потому что это много кода для вставки сюда).
Чтобы решить эту проблему, я преобразовал его в шаблон производителя/потребителя (или по крайней мере пытался). Здесь я сначала создаю задания, комбинируя оба входных файла и помещая их в очередь, которую обрабатывают потребители (вычисляя оценки нечеткого совпадения). Выполненные работы помещаются в очередь. Затем у меня есть один процесс, который берет выполненные элементы из этой очереди и записывает их в файл. Таким образом, теоретически мне не понадобится столько памяти, так как результаты будут сброшены на диск. Вроде работает нормально, но намного медленнее. Я также заметил, что 4 процесса, которые я создаю, похоже, не используют 100% ЦП при просмотре монитора активности в Mac OSX (что не относится к версии Pool.map()).
Еще одна вещь, которую я заметил, это то, что моя функция-производитель, кажется, правильно заполняет очередь, но процессы-потребители, похоже, ждут, пока очередь не заполнится, вместо того, чтобы начать работать, как только прибудет первый элемент. Я наверное что-то там не так делаю...
Для справки вот некоторый соответствующий код для версии Queue (хотя лучше посмотреть полный код в репозитории, указанном выше).
Вот моя функция производителя:
def combine(list1, list2):
'''
Combine every item of list1 with every item of list 2,
normalize put the pair in the job queue.
'''
pname = multiprocessing.current_process().name
for x in list1:
for y in list2:
# slugify is a function to normalize the strings
term1 = slugify(x.strip(), separator=' ')
term2 = slugify(y.strip(), separator=' ')
job_queue.put_nowait([term1, term2])
Это функция записи:
def writer(writer_queue):
out = open(file_out, 'wb')
pname = multiprocessing.current_process().name
out.write(header)
for match in iter(writer_queue.get, "STOP"):
print("%s is writing %s") % (pname, str(match))
line = str(';'.join(match) + '\n')
out.write(line)
out.close()
Это рабочая функция, которая выполняет фактические вычисления (убрана большая часть кода, поскольку здесь это не имеет значения, полный источник в репо):
def score_it(job_queue, writer_queue):
'''Calculate scores for pair of words.'''
pname = multiprocessing.current_process().name
for pair in iter(job_queue.get_nowait, "STOP"):
# do all the calculations and put the result into the writer queue
writer_queue.put(result)
Вот как я настроил процессы:
# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()
workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []
print('Start matching with "%s", minimum score of %s and %s workers') % (
args.algorithm, minscore, workers)
# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()
print("Job queue size: %s") % job_queue.qsize()
# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()
for w in xrange(workers):
p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
p.start()
processes.append(p)
job_queue.put("STOP")
for p in processes:
p.join()
writer_queue.put("STOP")
Я довольно много читал здесь о том, что многопроцессорность иногда замедляется, и я знаю, что это связано с накладными расходами на создание и управление новыми процессами. Кроме того, когда работа, которую нужно выполнить, недостаточно «большая», эффект многопроцессорности может быть незаметен. Однако в этом случае я думаю, что работа довольно большая, и версия Pool.map(), кажется, доказывает это, потому что она намного быстрее.
Я делаю что-то действительно неправильно, управляя всеми этими процессами и передавая объекты очереди? Как это можно оптимизировать, чтобы результаты можно было записывать в файл по мере их обработки, чтобы минимизировать объем памяти, необходимый при его выполнении?
Спасибо!
pool
, вы можете попробовать использоватьpool.imap
, чтобы получить итератор, дающий результат значения результатов по мере их вычисления рабочими процессами. В кодеmap
просто заменитеimap
наmap
и переместите вызовыpool.close
иpool.join
ниже цикла, который записывает результаты, и все готово! - person Blckknght   schedule 15.11.2014chunksize
. Если я правильно все понимаю,map
по умолчанию использует более крупные куски, чемimap
, что может значительно снизить накладные расходы, если последовательность, которую нужно отобразить, очень длинная. - person Blckknght   schedule 16.11.2014chunksize=1
по умолчанию до, скажем,chunksize=100
), но накладные расходы должны продолжать уменьшаться по мере того, как вы их увеличиваете. Методmap
вычисляет размер фрагмента по умолчанию, эквивалентныйmath.ceil(len(iterable) / 4.0 / cpu_count()))
, так что можно было бы начать и сimap
. - person Blckknght   schedule 17.11.2014