Многопроцессорные очереди Python медленнее, чем pool.map

Недавно я начал экспериментировать с многопроцессорностью, чтобы ускорить задачу. Я создал скрипт, который выполняет нечеткое сопоставление строк и вычисляет баллы с использованием разных алгоритмов (я хотел сравнить разные методы сопоставления). Полный исходный код можно найти здесь: https://bitbucket.org/bergonzzi/fuzzy-compare/src. На вход подаются 2 файла, которые объединены в пары (каждая строка файла1 с каждой строкой файла2). Для каждой пары вычисляются нечеткие оценки соответствия.

Я сделал 3 версии. Работая с образцами данных, представленными в моем репозитории (который состоит из 697 340 элементов после объединения в пары), у меня есть следующие тайминги:

  • Простой одиночный процесс - 0:00:47
  • Мультипроцесс с использованием Pool.map() — 0:00:13
  • Многопроцессорность с использованием очередей (шаблон производитель/потребитель) — 0:01:04

Я пытаюсь понять, почему моя версия Pool.map() намного быстрее, чем моя версия Queue, которая на самом деле медленнее, чем простая версия с одним процессом.

Моя причина даже попытки использовать очереди заключается в том, что версия Pool.map() удерживает результаты до тех пор, пока все не будет завершено, и записывает в файл только в конце. Это означает, что для больших файлов требуется много памяти. я говорю об этой версии (ссылка на нее, потому что это много кода для вставки сюда).

Чтобы решить эту проблему, я преобразовал его в шаблон производителя/потребителя (или по крайней мере пытался). Здесь я сначала создаю задания, комбинируя оба входных файла и помещая их в очередь, которую обрабатывают потребители (вычисляя оценки нечеткого совпадения). Выполненные работы помещаются в очередь. Затем у меня есть один процесс, который берет выполненные элементы из этой очереди и записывает их в файл. Таким образом, теоретически мне не понадобится столько памяти, так как результаты будут сброшены на диск. Вроде работает нормально, но намного медленнее. Я также заметил, что 4 процесса, которые я создаю, похоже, не используют 100% ЦП при просмотре монитора активности в Mac OSX (что не относится к версии Pool.map()).

Еще одна вещь, которую я заметил, это то, что моя функция-производитель, кажется, правильно заполняет очередь, но процессы-потребители, похоже, ждут, пока очередь не заполнится, вместо того, чтобы начать работать, как только прибудет первый элемент. Я наверное что-то там не так делаю...

Для справки вот некоторый соответствующий код для версии Queue (хотя лучше посмотреть полный код в репозитории, указанном выше).

Вот моя функция производителя:

def combine(list1, list2):
    '''
    Combine every item of list1 with every item of list 2,
    normalize put the pair in the job queue.
    '''
    pname = multiprocessing.current_process().name
    for x in list1:
        for y in list2:
            # slugify is a function to normalize the strings
            term1 = slugify(x.strip(), separator=' ')
            term2 = slugify(y.strip(), separator=' ')
            job_queue.put_nowait([term1, term2])

Это функция записи:

def writer(writer_queue):
    out = open(file_out, 'wb')
    pname = multiprocessing.current_process().name
    out.write(header)
    for match in iter(writer_queue.get, "STOP"):
        print("%s is writing %s") % (pname, str(match))
        line = str(';'.join(match) + '\n')
        out.write(line)
    out.close()

Это рабочая функция, которая выполняет фактические вычисления (убрана большая часть кода, поскольку здесь это не имеет значения, полный источник в репо):

def score_it(job_queue, writer_queue):
    '''Calculate scores for pair of words.'''
    pname = multiprocessing.current_process().name

    for pair in iter(job_queue.get_nowait, "STOP"):
        # do all the calculations and put the result into the writer queue
        writer_queue.put(result)

Вот как я настроил процессы:

# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()

workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []

print('Start matching with "%s", minimum score of %s and %s workers') % (
    args.algorithm, minscore, workers)

# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()

print("Job queue size: %s") % job_queue.qsize()

# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()

for w in xrange(workers):
    p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
    p.start()
    processes.append(p)
    job_queue.put("STOP")

for p in processes:
    p.join()

writer_queue.put("STOP")

Я довольно много читал здесь о том, что многопроцессорность иногда замедляется, и я знаю, что это связано с накладными расходами на создание и управление новыми процессами. Кроме того, когда работа, которую нужно выполнить, недостаточно «большая», эффект многопроцессорности может быть незаметен. Однако в этом случае я думаю, что работа довольно большая, и версия Pool.map(), кажется, доказывает это, потому что она намного быстрее.

Я делаю что-то действительно неправильно, управляя всеми этими процессами и передавая объекты очереди? Как это можно оптимизировать, чтобы результаты можно было записывать в файл по мере их обработки, чтобы минимизировать объем памяти, необходимый при его выполнении?

Спасибо!


person bergonzzi    schedule 15.11.2014    source источник
comment
Я не знаю, что происходит с производительностью вашей системы, основанной на очередях (я еще не очень внимательно смотрел), но чтобы решить проблему потребления памяти вашей версией на основе pool, вы можете попробовать использовать pool.imap, чтобы получить итератор, дающий результат значения результатов по мере их вычисления рабочими процессами. В коде map просто замените imap на map и переместите вызовы pool.close и pool.join ниже цикла, который записывает результаты, и все готово!   -  person Blckknght    schedule 15.11.2014
comment
Большое спасибо за предложение, ваше решение действительно решает проблему с памятью. С imap каждый процесс занимает менее 100 МБ, а с картой — до 1 ГБ. Однако это становится медленнее - на моих тестовых данных я получаю 34 секунды с imap (против 13 секунд с картой). Любая идея, почему это может быть?   -  person bergonzzi    schedule 15.11.2014
comment
Возможно, это связано с параметром chunksize. Если я правильно все понимаю, map по умолчанию использует более крупные куски, чем imap, что может значительно снизить накладные расходы, если последовательность, которую нужно отобразить, очень длинная.   -  person Blckknght    schedule 16.11.2014
comment
И как мне найти идеальный размер фрагмента для оптимизации производительности? Спасибо!   -  person bergonzzi    schedule 16.11.2014
comment
Я бы предложил использовать как можно больший размер блока без нехватки памяти. Конечно, наибольшая выгода, вероятно, будет получена раньше (увеличение от chunksize=1 по умолчанию до, скажем, chunksize=100), но накладные расходы должны продолжать уменьшаться по мере того, как вы их увеличиваете. Метод map вычисляет размер фрагмента по умолчанию, эквивалентный math.ceil(len(iterable) / 4.0 / cpu_count())), так что можно было бы начать и с imap.   -  person Blckknght    schedule 17.11.2014
comment
Тестируя большие данные, я обнаружил, что размер фрагмента 100 (по крайней мере, для этого варианта использования) обеспечивает наилучшие результаты с точки зрения скорости и памяти. Дополнительные процессы занимают всего около 3 МБ. Использование предложенной вами формулы использует слишком много памяти для каждого процесса (опять же, в моем случае использования). Основной процесс по-прежнему использует много (около 1 ГБ), но это, вероятно, потому, что я вычисляю большой список из почти 10 миллионов элементов и храню его в памяти. Я все еще не мог найти способ использовать map() с ленивым генератором вместо простого списка...   -  person bergonzzi    schedule 18.11.2014


Ответы (1)


Я думаю, что проблема с вашими таймингами заключается в том, что в вашей версии многопоточной очереди отсутствует оптимизация. Вы сделали комментарий, по существу говоря, что ваша job_queue заполняется до того, как рабочие потоки начнут брать из нее задания. Я считаю, что причиной этого является c.join(), который у вас есть в очереди заданий #Fill up. Это предотвращает продолжение основного потока до тех пор, пока очередь заданий не будет заполнена. Я бы переместил c.join() в конец после p.join(). Вам также нужно будет найти способ поставить стоп-флажки в конец очереди. Функция объединения может быть хорошим местом для этого. Что-то вроде добавления x количества стоп-флажков после того, как закончились данные для объединения.

Еще одно замечание: вы записываете свою переменную w в рамках цикла for, который запускает процессы p. Что касается стиля/удобочитаемости/и т. д., я бы изменил w на другое имя переменной. Если вы его не используете, подчеркивание работает как хорошее одноразовое имя переменной. т.е.

for w in xrange(workers):

должен стать

for _ in xrange(workers):

Короче говоря, если вы переместите c.join() в конец, вы должны получить более точные тайминги. В настоящее время единственное, что является многопоточным, — это нечеткое сопоставление строк. Одним из преимуществ наличия потока производителя/потребителя является то, что потокам-потребителям не нужно ждать завершения потока производителя, и, таким образом, вы в конечном итоге используете меньше памяти.

person Bryan Lott    schedule 10.12.2014
comment
Спасибо за совет! Имеет смысл присоединиться к процессу подачи заданий в конце... Думаю, я все еще не понимал, что на самом деле делает join(), теперь мне все ясно. Что я до сих пор не очень хорошо понимаю, так это ваше предложение. Вам также нужно найти способ поставить свои стоп-флаги в конец очереди. Функция объединения может быть хорошим местом для этого. Что-то вроде добавления x количества стоп-флажков после того, как закончились данные для объединения. - что ты конкретно имеешь ввиду? Почему x количество стоп-флажков? И как я могу обнаружить, что у меня больше нет данных для объединения? - person bergonzzi; 11.12.2014
comment
Поскольку ваши данные поступают из файлов, всякий раз, когда файлы заканчиваются, вы знаете, что у вас больше нет данных для объединения. Вашим потребительским потокам потребуется какой-то сигнал, чтобы знать, когда завершать работу, таким образом, реализация стоп-флагов. Всякий раз, когда поток-потребитель извлекает задание из очереди, и это STOP, None или что-то подобное (вам нужно будет выбрать, что лучше для вашего приложения), поток знает, что он может завершить работу. Как я сделал это в аналогичном приложении, мой поток-производитель, после того как он был завершен, создавая задания, выдавал стоп-флаги, равные количеству потоков-потребителей. - person Bryan Lott; 11.12.2014