Запуск моделей на нескольких ядрах с разными наборами данных в Python

У меня есть папка, содержащая несколько наборов данных, и я хочу запустить модель для этих наборов данных и распределить нагрузку между несколькими ядрами, надеюсь, чтобы увеличить общее время обработки данных.

Мой компьютер имеет 8 ядер. Это была моя первая попытка ниже, это всего лишь набросок, но, используя htop, я вижу, что для этой работы используется только 1 ядро. Многоядерный новичок здесь.

import pandas as pd
import multiprocessing
import os
from library_example import model_example

def worker(file_):
    to_save = pd.Series()
    with open(file_,'r') as f_open:
        data = f_open.read()

    # Run model 
    model_results = model_example(file_)

    # Save results in DataFrame
    to_save.to_csv(file_[:-4]+ "_results.csv", model_results )

file_location_ = "/home/datafiles/"
if __name__ == '__main__':
    for filename in os.listdir(file_location_):
        p = multiprocessing.Process(target=worker, args=(file_location_ + filename,))
        p.start()
        p.join()

person William Baker Morrison    schedule 07.11.2017    source источник


Ответы (1)


Попробуйте убрать p.join(). Это будет ждать завершения процесса, что фактически делает этот процесс последовательным, когда вы запускаете процесс (например, start), а затем ждете каждого из них (например, join). Вместо этого вы можете попробовать что-то вроде этого:

# construct the workers
workers = [multiprocessing.Process(target=worker, args=(file_location_ + filename,)) for filename in os.listdir(file_location_)]

# start them
for proc in workers:
    proc.start()

# now we wait for them
for proc in workers:
    proc.join()

(Я не пробовал запускать это в вашем коде, но что-то подобное должно работать.)

EDIT Если вы хотите ограничить количество рабочих/процессов, я бы рекомендовал просто использовать файл Pool. Вы можете указать, сколько процессов использовать, а затем map(..) аргументы для этих процессов. Пример:

# construct a pool of workers
pool = multiprocessing.Pool(6)
pool.map(worker, [file_location_ + filename for filename in os.listdir(file_location_)])
pool.close()
person Paul    schedule 07.11.2017
comment
Могу ли я ограничить количество используемых ядер? например 6 из 8 - person William Baker Morrison; 07.11.2017