Когда я начал свой путь, пытаясь использовать несколько ядер в своем коде Python, я обнаружил, что не существует отдельного блога, который помог бы мне достичь моей цели. Я надеюсь, что этот блог заполнит некоторые из выявленных мною пробелов и позволит читателям быстро пройти через процесс внедрения многопроцессорной обработки, чтобы полностью использовать все ваши вычислительные ресурсы!

Этот блог состоит из нескольких разделов, которые я попытался разбить в порядке, который мне кажется логичным. Разбивку контента можно найти ниже:

  • Пакеты
  • ядра
  • Сохранение результатов
  • Функции
  • Запуск вашей функции
  • Что следует учитывать
  • Пример

Пакеты

Во-первых, чтобы создать собственную многопроцессорную программу, вам понадобится специальный пакет. В этой статье я буду использовать многопроцессорность из многопроцессорность для создания примера

import multiprocessing

Ядра

Теперь, когда вы импортировали многопроцессорность в свой ноутбук/среду, вы сможете увидеть, сколько ядер вам доступно. Вы можете сделать это, просто используя приведенный ниже код.

multiprocessing.cpu_count()

Это выведет целое число, которое будет максимальным количеством ядер, на которых вы можете запустить свою многопроцессорную обработку.

СОВЕТ: Чтобы гарантировать, что вы всегда используете максимальные вычислительные ресурсы, я обычно устанавливаю это в качестве входных данных в моем скрипте — процессоры = multiprocessing.cpu_count()

Сохранение результатов

Это потенциально самая сложная вещь для понимания из существующей документации, поскольку неясно, как извлекать результаты из нескольких ядер при выполнении многопроцессорной обработки.

Чтобы извлечь результаты, вам нужно создать специальный словарь, в который можно передать ваши результаты, как показано ниже.

multiprocessing_dict = multiprocessing.Manager().dict()

Это установит multiprocessing_dict_ как пустой объект DictProxy в вашем скрипте. Вы увидите, как сохранить результаты позже в этой статье.

Функция

Итак, вот часть, которую вы настраиваете в соответствии с вашими потребностями в многопроцессорной обработке.

Чтобы выполнить любую задачу на ядрах, вам нужно создать функцию, которую может вызывать процесс, который вы бы написали как обычно. Однако, чтобы разрешить ее использование и сохранение результатов с помощью многопроцессорной обработки, вам необходимо добавить в функцию два конкретных аргумента.

Эти;

  1. Процессор, на котором он работает;
  2. Словарь, который мы создали на предыдущем шаге.

Затем вместо того, чтобы возвращать выходные данные функции с помощью return , как обычно, вы передаете их аргументу multi_dict_ с процессором_ в качестве ключа.

def your_func(your args, processor_, multi_dict_):
    
    your function 
    
    multi_dict_[processor_] = your output

ПРИМЕЧАНИЕ. Я попытался написать общую оболочку функции, чтобы продемонстрировать требования к аргументам для вашей функции. Если из этого неясно, возможно, стоит обратиться к примеру кода, прежде чем продолжить.

Запуск вашей функции

Теперь, когда мы создали функцию, мы можем посмотреть, как можно запустить ее по количеству найденных ранее ядер. Для этого есть несколько шагов, поэтому я попытаюсь их разбить.

  1. Во-первых, вам нужно создать пустой список, в котором будут храниться выходные данные вашей функции.
  2. Затем нужно создать цикл по количеству процессоров. Когда вы вызываете многопроцессорную функцию, вы можете передать ей свою функцию с соответствующими аргументами. Затем вы сохраняете это в своем пустом списке, созданном на шаге 1.
  3. Теперь у вас есть команды многопроцессорной обработки, сохраненные в вашем списке, вы можете просмотреть этот список и запустить многопроцессорную обработку.
  4. Затем вы можете объединить результаты в ваших списках вместе.

Многое из этого может не иметь смысла на данный момент, поэтому давайте посмотрим, как это выглядит в коде, с шагами, помеченными как комментарии.

1#
multiprocessing_loop_ = []
2#
for processor_ in range(processors):
    
    multi_process_ = multiprocessing.Process(
        target = your_func, 
        args = (your args, processor_, multiprocessing_dict)
    )
    multiprocessing_loop_.append(multi_process_)
#3
for process_ in multiprocessing_loop_:
    process_.start()
#4
for process_ in multiprocessing_loop_:
    process_.join()

ПРИМЕЧАНИЕ. Опять же, я попытался сделать эту часть кода общей, чтобы продемонстрировать рабочие требования. Если из этого неясно, возможно, стоит обратиться к примеру кода, прежде чем продолжить.

Что следует учитывать

Разделение ваших входных данных.В приведенном выше примере мы будем запускать одну и ту же функцию для одних и тех же входных данных на любом количестве ядер, которое вы указали.

Если вы хотите запускать определенные подмножества входных данных, важно, чтобы вы встроили эти шаги до многопроцессорной обработки.

Я продемонстрировал, как это можно сделать, на примере в конце статьи.

Понимание ваших выходных данных —В приведенном выше примере ваши выходные данные будут храниться в виде словаря в списке. Поэтому еще предстоит проделать работу, чтобы преобразовать выходные данные в пригодный для использования формат.

Я продемонстрировал, как это можно сделать, на примере в конце статьи.

Операционная система Windows. К сожалению, нет простого способа развернуть многопроцессорную обработку в операционной системе Windows. Насколько я понимаю, это связано с требованием fork().

Из-за этого я не буду пытаться решать эту проблему в рамках этой статьи.

Пример

Наконец, давайте объединим то, что мы узнали, в один скрипт.

Для этого я создал DataFrame с двумя столбцами. Одно — случайное число, другое — случайное целое число (от 1 до 100). Я смоделировал 10 000 000 строк для каждого из этих столбцов.

Моя функция возведет мое случайное число в степень моего случайного целого числа.

Затем это будет добавлено в фрейм данных как столбец «RandomPower».

import pandas as pd
import numpy as np
import multiprocessing as mp
processors = mp.cpu_count()
mp_dict = mp.Manager().dict()
mp_loop = []
size = 10000000
input_step = int(size / processors)
df = pd.DataFrame(data = {
    'RandomNumber' : np.random.random(size = size),
    'RandomInt' : np.random.randint(0, high = 100, size = size)
})
def func(df_, indexlist_, processor_, multi_dict_):
    '''
    This function will raise the RandomNumber in the dataframe by the power of the RandomInt in the dataframe.
    ''' 
    series_ = pd.Series(
        data = df_.loc[indexlist_, 'RandomNumber'].values ** df_.loc[indexlist_, 'RandomInt'].values,
        index = indexlist_
    )
multi_dict_[processor_] = series_.to_dict()
for processor_ in range(processors):    
    if processor_ == 0:       
        index_ = list(range(0, input_step, 1))
    elif processor_ == processors - 1:
        index_ = list(range(processor_ * input_step, size, 1))
    else:
        index_ = list(range(input_step * processor_, input_step * (processor_ + 1), 1))
mp_ = mp.Process(
        target = func,
        args = (df, index_, processor_, mp_dict)
    )    
    mp_loop.append(mp_)
for l_ in mp_loop:
    l_.start()
    
for l_ in mp_loop:
    l_.join()
    
df_results = pd.DataFrame([])
    
for key_ in mp_dict.keys():
    df_results = df_results.append(
        pd.DataFrame.from_dict(
            mp_dict[key_], orient = 'index', columns = ['RandomPower']
        )
    )
    
df = df.merge(
    df_results, 
    how = 'left',
    left_index = True, 
    right_index = True
)

ПРИМЕЧАНИЕ. Этот конкретный пример на самом деле требует больше времени для запуска на нескольких ядрах, чем если бы вы использовали numpy. Он был создан исключительно для демонстрации того, как манипулировать и извлекать результаты из вашей функции при запуске на нескольких ядрах.

Я надеюсь, что вы нашли этот блог полезным.

Написать и опубликовать свою первую статью очень сложно, поэтому любой отзыв будет очень полезен — а также, надеюсь, несколько аплодисментов!

Кроме того, если вы заметите какие-либо ошибки в моем коде, сообщите мне, чтобы я мог исправить их соответствующим образом.