Многопроцессорность и очередь с Dataframe

У меня возникли проблемы с обменом объектом (dataframe) между двумя процессами через Queue.

Первый процесс получает данные из очереди, второй помещает данные в очередь. Процесс put быстрее, поэтому процесс get должен очистить очередь, прочитав весь объект.

У меня странное поведение, потому что мой код работает отлично и, как и ожидалось, но только для 100 строк в кадре данных, для 1000 строк процесс получения всегда принимает только 1 объект.

import multiprocessing, time, sys
import pandas as pd

NR_ROWS = 1000
i = 0
def getDf():
    global i, NR_ROWS
    myheader = ["name", "test2", "test3"]                
    myrow1 =   [ i,  i+400, i+250]
    df = pd.DataFrame([myrow1]*NR_ROWS, columns = myheader)
    i = i+1
    return df 


def f_put(q):
    print "f_put start"        

    while(1): 
        data = getDf()                
        q.put(data)
        print "P:", data["name"].iloc[0]         
        sys.stdout.flush()                    
        time.sleep(1.55)


def f_get(q):
    print "f_get start"    

    while(1):     
        data = pd.DataFrame()

        while not q.empty():
            data = q.get()
            print "get"

        if not data.empty:
            print "G:", data["name"].iloc[0] 
        else:
            print "nothing new"                       
        time.sleep(5.9)


if __name__ == "__main__":

    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=f_put, args=(q,))            
    p.start()
    while(1):
        f_get(q)

    p.join()

Вывод для 100-строчного фрейма данных, get-process принимает все объекты

f_get start
nothing new
f_put start
P: 0        # put 1.object into the queue
P: 1        # put 2.object into the queue
P: 2        # put 3.object into the queue
P: 3        # put 4.object into the queue
get         # get-process takes all 4 objects from the queue
get
get
get
G: 3
P: 4
P: 5
P: 6
get
get
get
G: 6
P: 7
P: 8

Вывод для кадра данных 1000 строк, get-process принимает только один объект.

f_get start
nothing new
f_put start
P: 0        # put 1.object into the queue
P: 1        # put 2.object into the queue
P: 2        # put 3.object into the queue
P: 3        # put 4.object into the queue
get     <-- #!!! get-process takes ONLY 1 object from the queue!!!
G: 1
P: 4
P: 5
P: 6
get
G: 2
P: 7
P: 8
P: 9
P: 10
get
G: 3
P: 11

Есть идеи, что я делаю неправильно и как передать больший фрейм данных?


person Meloun    schedule 05.08.2015    source источник
comment
Я быстро протестировал ваш код, и он работает так, как вы описываете, даже для N › 1000. Возможно ли, что вы используете какую-то старую версию pandas и/или многопроцессорность, которая дает такое поведение? (__version__: панды 0.16.2, многопроцессорность 0.70a1, питон 2.7.10)   -  person chris-sc    schedule 11.08.2015
comment
У меня все пакеты обновлены, но я все еще не получил ожидаемых результатов. Попробуйте это pastebin.com/bihSv93F Первая попытка делается вручную и работает, последний элемент читается G: 2. Затем я пытаюсь сделать то же самое с многопроцессорностью, и это не работает.   -  person Meloun    schedule 11.08.2015
comment
панды: 0.16.2, многопроцессорность: 0.70a1, питон 2.7.10   -  person Meloun    schedule 11.08.2015
comment
Когда я использую большой словарь вместо фрейма данных, я получаю такое же поведение.   -  person Meloun    schedule 12.08.2015
comment
Да. Посмотрите на мой ответ ниже. Проблема заключается не только в DataFrame как таковом, но и во всех объектах Python, размеры которых превышают некоторый порог, который будет зависеть от системы.   -  person chris-sc    schedule 12.08.2015


Ответы (1)


Рискуя оказаться не в состоянии предоставить полностью функциональный пример, вот что пойдет не так.

Прежде всего, это проблема времени.

Я снова попробовал ваш код с большими фреймами данных (10000 или даже 100000) и начал видеть то же, что и вы. Это означает, что вы видите это поведение, как только размер массивов пересекает определенный порог, который будет зависеть от системы (ЦП?).

Я немного изменил ваш код, чтобы было легче увидеть, что происходит. Во-первых, 5 DataFrames стоят put в очереди без каких-либо time.sleep пользовательских. В функции f_get я добавил счетчик (и time.sleep(0), см. ниже) в цикл (while not q.empty()).

Новый код:

import multiprocessing, time, sys                                                 
import pandas as pd                                                              

NR_ROWS = 10000                                                                  
i = 0                                                                            
def getDf():                                                                     
    global i, NR_ROWS                                                            
    myheader = ["name", "test2", "test3"]                                        
    myrow1 =   [ i,  i+400, i+250]                                               
    df = pd.DataFrame([myrow1]*NR_ROWS, columns = myheader)                      
    i = i+1                                                                      
    return df                                                                    

def f_put(q):                                                                    
    print "f_put start"                                                          
    j = 0                                                                        
    while(j < 5):                                                                
        data = getDf()                                                           
        q.put(data)                                                              
        print "P:", data["name"].iloc[0]                                         
        sys.stdout.flush()                                                       
        j += 1                                                                   

def f_get(q):                                                                    
    print "f_get start"                                                          
    while(1):
        data = pd.DataFrame()                                                    
        loop = 0                                                                 
        while not q.empty():                                                     
            data = q.get()                                                  
            print "get (loop: %s)" %loop
            time.sleep(0)                                         
            loop += 1                                                            
        time.sleep(1.)                                                           

if __name__ == "__main__":                                                       

    q = multiprocessing.Queue()                                                  
    p = multiprocessing.Process(target=f_put, args=(q,))                         
    p.start()                                                                    
    while(1):                                                                    
        f_get(q)                                                                 
    p.join()

Теперь, если вы запустите это для другого количества строк, вы увидите что-то вроде этого:

N=100:

f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 2)
get (loop: 3)
get (loop: 4)

N=10000:

f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 0)
get (loop: 0)
get (loop: 0)

О чем это нам говорит? Пока DataFrame мало, ваше предположение о том, что процесс put быстрее, чем get, кажется верным, мы можем получить все 5 элементов за один цикл while not q.empty().

Но по мере увеличения количества строк что-то меняется. Условие while q.empty() оценивается как True (очередь пуста) и внешний цикл while(1).

Это может означать, что put теперь медленнее, чем get, и нам нужно подождать. Но если мы установим время сна для всего f_get на что-то вроде 15, мы все равно получим такое же поведение.

С другой стороны, если мы изменим time.sleep(0) во внутреннем цикле q.get() на 1,

while not q.empty():                                                  
    data = q.get()                                                    
    time.sleep(1)                                                     
    print "get (loop: %s)" %loop                                      
    loop += 1

мы получаем это:

f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 2)
get (loop: 3)
get (loop: 4)

Это выглядит правильно! А это значит, что на самом деле get делает что-то странное. Кажется, что пока он все еще обрабатывает get, состояние очереди empty, и после того, как get выполнено, доступен следующий элемент.

Я уверен, что для этого есть причина, но я недостаточно знаком с multiprocessing, чтобы понять это.

В зависимости от вашего приложения вы можете просто добавить соответствующий time.sleep во внутренний цикл и посмотреть, достаточно ли этого.

Или, если вы хотите решить эту проблему (вместо использования обходного пути в качестве метода time.sleep), вы можете заглянуть в multiprocessing и найти информацию о блокировке, неблокировке или асинхронная коммуникация — думаю, решение будет найдено там.

person chris-sc    schedule 11.08.2015
comment
Добрый день, ребята! Вы решили проблему? Я тоже новичок в этой сфере. А что если использовать блокировку на время взаимодействия одного рабочего процесса с очередью? Решит ли это проблему? - person Blademoon; 08.04.2021