У меня есть программа, которая пытается предсказать конверсию электронной почты для каждого электронного письма, которое я отправляю за неделю (так что обычно 7 отправок). На выходе получается 7 разных файлов с оценками прогнозов для каждого клиента. Их последовательный запуск может занять около 8 часов, поэтому я попытался распараллелить их с помощью multiprocessing
. Это очень хорошо ускоряет работу, но я заметил, что после завершения процесса он, кажется, удерживает свою память до тех пор, пока ее не останется, и один из процессов будет уничтожен системой, не выполнив свою задачу.
Я взял следующий код из "manual pool' в этом ответе, так как мне нужно ограничить количество процессов, которые запускаются одновременно, из-за нехватки памяти. Я бы хотел, чтобы по завершении одного процесса он освобождал память для системы, освобождая место для следующего рабочего процесса.
Ниже приведен код, который обрабатывает параллелизм:
def work_controller(in_queue, out_list):
while True:
key = in_queue.get()
print key
if key == None:
return
work_loop(key)
out_list.append(key)
if __name__ == '__main__':
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
processes = []
for i in xrange(num_workers):
p = Process(target=work_controller, args=(work,results))
processes.append(p)
p.start()
iters = itertools.chain([key for key in training_dict.keys()])
for item in iters:
work.put(item)
for p in processes:
print "Joining Worker"
p.join()
Вот фактический рабочий код, если это поможет:
def work_loop(key):
with open('email_training_dict.pkl','rb') as f:
training_dict = pickle.load(f)
df_test = pd.DataFrame.from_csv(test_file)
outdict = {}
target = 'is_convert'
df_train = train_dataframe(key)
features = data_cleanse(df_train,df_test)
# MAIN PREDICTION
print 'Start time: {}'.format(datetime.datetime.now()) + '\n'
# train/test by mailer
X_train = df_train[features]
X_test = df_test[features]
y_train = df_train[target]
# run model fit
clf = imbalance.ImbalanceClassifier()
clf = clf.fit(X_train, y_train)
y_hat = clf.predict(X_test)
outdict[key] = clf.y_vote
print outdict[key]
print 'Time Complete: {}'.format(datetime.datetime.now()) + '\n'
with open(output_file,'wb') as f:
pickle.dump(outdict,f)
work_loop
последовательно? - person ali_m   schedule 11.06.2015work_loop
занимает 1 ГБ памяти, а следующая — 500 МБ, программа использует всего 1 ГБ. Но если следующая итерация занимает 2 ГБ, то программа сохраняет все эти 2 ГБ. Я все еще ищу способ вернуть это обратно в систему. - person jpavs   schedule 18.06.2015gc.collect()
. Однако даже после того, как объект был собран мусором, только что освобожденная память может не быть восстановлена ОС, поэтому, если вы наблюдаете за использованием памяти вашим процессом Python, вы не должны ожидать, что он немедленно выйдет из строя. - person ali_m   schedule 18.06.2015work_loop()
, то я подозревал бы, что у вас была утечка памяти вwork_loop
. - person ali_m   schedule 18.06.2015