Как освободить память от процесса в multiprocessing.queue?

У меня есть программа, которая пытается предсказать конверсию электронной почты для каждого электронного письма, которое я отправляю за неделю (так что обычно 7 отправок). На выходе получается 7 разных файлов с оценками прогнозов для каждого клиента. Их последовательный запуск может занять около 8 часов, поэтому я попытался распараллелить их с помощью multiprocessing. Это очень хорошо ускоряет работу, но я заметил, что после завершения процесса он, кажется, удерживает свою память до тех пор, пока ее не останется, и один из процессов будет уничтожен системой, не выполнив свою задачу.

Я взял следующий код из "manual pool' в этом ответе, так как мне нужно ограничить количество процессов, которые запускаются одновременно, из-за нехватки памяти. Я бы хотел, чтобы по завершении одного процесса он освобождал память для системы, освобождая место для следующего рабочего процесса.

Ниже приведен код, который обрабатывает параллелизм:

def work_controller(in_queue, out_list):
    while True:
        key = in_queue.get()
        print key

        if key == None:
            return

        work_loop(key)
        out_list.append(key)

if __name__ == '__main__':

    num_workers = 4
    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)
    processes = []

    for i in xrange(num_workers):
        p = Process(target=work_controller, args=(work,results))
        processes.append(p)
        p.start()

    iters = itertools.chain([key for key in training_dict.keys()])
    for item in iters:
        work.put(item)

    for p in processes:
        print "Joining Worker"
        p.join()

Вот фактический рабочий код, если это поможет:

def work_loop(key):
    with open('email_training_dict.pkl','rb') as f:
        training_dict = pickle.load(f)
    df_test = pd.DataFrame.from_csv(test_file)
    outdict = {}
    target = 'is_convert'

    df_train = train_dataframe(key)
    features = data_cleanse(df_train,df_test)

    # MAIN PREDICTION
    print 'Start time: {}'.format(datetime.datetime.now()) + '\n'

    # train/test by mailer
    X_train = df_train[features]
    X_test = df_test[features]
    y_train = df_train[target]

    # run model fit
    clf = imbalance.ImbalanceClassifier()

    clf = clf.fit(X_train, y_train)
    y_hat = clf.predict(X_test)

    outdict[key] = clf.y_vote
    print outdict[key]
    print 'Time Complete: {}'.format(datetime.datetime.now()) + '\n'
    with open(output_file,'wb') as f:
        pickle.dump(outdict,f)

person jpavs    schedule 10.06.2015    source источник
comment
Насколько вы уверены, что утечка памяти связана с многопроцессорностью? Вы все еще видите увеличение потребления памяти, если вы вызываете свою функцию work_loop последовательно?   -  person ali_m    schedule 11.06.2015
comment
@ali_m Спасибо за ваш комментарий. Вы правы, есть какая-то утечка памяти. Интересно (или нет?), что программа не будет потреблять новый блок памяти для каждой итерации — она берет больше по мере необходимости. Так, например, если одна итерация work_loop занимает 1 ГБ памяти, а следующая — 500 МБ, программа использует всего 1 ГБ. Но если следующая итерация занимает 2 ГБ, то программа сохраняет все эти 2 ГБ. Я все еще ищу способ вернуть это обратно в систему.   -  person jpavs    schedule 18.06.2015
comment
Это на самом деле звучит как нормальное поведение для меня. Когда объект Python выходит из области видимости (или если вы удаляете его вручную), выделенная ему память не будет освобождена до тех пор, пока объект не будет удален сборщиком мусора. Точно когда это произойдет, довольно непредсказуемо, но вы можете форсировать сборку мусора, используя gc.collect(). Однако даже после того, как объект был собран мусором, только что освобожденная память может не быть восстановлена ​​ОС, поэтому, если вы наблюдаете за использованием памяти вашим процессом Python, вы не должны ожидать, что он немедленно выйдет из строя.   -  person ali_m    schedule 18.06.2015
comment
Если бы использование памяти вашим процессом Python увеличивалось на некоторое приращение каждый раз, когда вы запускали work_loop(), то я подозревал бы, что у вас была утечка памяти в work_loop.   -  person ali_m    schedule 18.06.2015
comment
В любом случае, я думаю, вы вряд ли получите удовлетворительный ответ на свой вопрос в его нынешнем виде, поскольку невозможно воспроизвести проблему без остальной части вашего кода и входных данных. У вас было бы гораздо больше шансов, если бы вы могли сократить свой код до MCVE (есть большая вероятность, что вы будете возможность самостоятельно определить причину в процессе).   -  person ali_m    schedule 19.06.2015


Ответы (1)


Я предполагаю, что, как и в примере, который вы связали, вы используете Queue.Queue() в качестве объекта очереди. Это блокирующая очередь, что означает, что вызов queue.get() вернет элемент или будет ждать/блокировать, пока он не сможет вернуть элемент. Попробуйте изменить функцию work_controller на следующую:

def work_controller(in_queue, out_list):
  while True: # when the queue is empty return
      try:
          key = in_queue.get(False) # add False to not have the queue block
      except Queue.Empty:
          return
      print key

      work_loop(key)
      out_list.append(key)

Хотя вышеизложенное решает проблему блокировки, оно порождает другую. В начале жизни потоков в in_queue нет элементов, поэтому потоки немедленно завершатся.

Чтобы решить эту проблему, я предлагаю вам добавить флаг, чтобы указать, можно ли завершить работу.

global ok_to_end # put this flag in a global space

def work_controller(in_queue, out_list):
  while True: # when the queue is empty return
      try:
          key = in_queue.get(False) # add False to not have the queue block
      except Queue.Empty:
          if ok_to_end: # consult the flag before ending.
              return
      print key

      work_loop(key)
      out_list.append(key)

if __name__ == '__main__':

    num_workers = 4
    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)
    processes = []

    ok_to_end = False # termination flag
    for i in xrange(num_workers):
        p = Process(target=work_controller, args=(work,results))
        processes.append(p)
        p.start()

    iters = itertools.chain([key for key in training_dict.keys()])
    for item in iters:
        work.put(item)

    ok_to_end = True # termination flag set to True after queue is filled

    for p in processes:
        print "Joining Worker"
        p.join()
person steve    schedule 10.06.2015