Как обрабатывать ваши данные параллельно с заранее определенным количеством потоков

Недавно я столкнулся с проблемой: мне нужно было создать большое количество (порядка 100) контейнеров Docker, а затем отправить их в реестр. Docker SDK для Python отлично справляется с этим, а вместе с библиотекой multiprocessing позволяет очень эффективно распараллеливать задачу. Однако после некоторого первоначального тестирования я обнаружил, что отправка нескольких изображений в реестр застопорилась, вероятно, из-за перегрузки одновременных загрузок. В моем тестировании я мог запускать только 2–3 одновременных docker push команды, пока все новые, которые я добавлял, не остановились. На этом этапе я решил ограничить одновременную загрузку небольшим количеством параллельных потоков, при этом по-прежнему используя большое количество потоков для облегчения сборки изображений. Комбинация очереди (multiprocessing.Queue) для передачи работы от потоков компоновщика к потокам-толкателям и пулу потоков (multiprocessing.Pool) выглядела как лучший кандидат. Тем не менее, в документации есть небольшие нюансы и пробелы, на изучение которых мне потребовалось время (особенно при использовании multiprocessing в Windows). Ниже я представлю небольшой учебник о том, как использовать эти структуры данных и объекты.

Постановка проблемы

В этой игрушечной задаче у нас есть большой массив параллельных процессов, записывающих результаты в очередь. Наряду с ними существует однопоточный процесс чтения, который проверяет наличие новых элементов в очереди и назначает их новым процессам в пуле, так что только небольшое фиксированное количество этих процессов выполняется одновременно. Давайте рассмотрим все элементы ниже.

Process

Для нашего большого массива параллельных потоков слева мы будем использовать multithreading.Process(). Из официальной справки:

Processobjects представляют собой действия, которые выполняются в отдельном процессе.

Для запуска процесса (ов) требуются 2 вещи: вызываемая целевая функция и сам Processcall. Давайте взглянем:

from multiprocessing import Process
def proc(i):
    print(f'I am Process {i}')
if __name__ ==  '__main__':
    for i in range(10):
        Process(target=proc, args=(i,)).start()

В приведенном выше примере мы создали 10 Processes и запустили их все одновременно. Каждый процесс запускает экземпляр функции proc() с аргументами, взятыми из arg. Поскольку порядок выполнения не гарантируется, когда мы запускаем его, мы получаем что-то вроде:

I am Process 6
I am Process 2
I am Process 0
I am Process 3
I am Process 7
I am Process 4
I am Process 8
I am Process 1
I am Process 5
I am Process 9

Также обратите внимание на интересный синтаксис args=(i,). Process требует, чтобы args был итеративным, поэтому изменение его на args=(i) или args=i приведет к TypeError.

Очередь

Теперь пора представить multithreading.Queue(). По справке:

Queue() возвращает общую очередь процесса, реализованную с использованием канала и нескольких блокировок / семафоров.

Очередь позволяет нам помещать в нее объекты и асинхронно обрабатывать их в другом месте. Важно отметить, что очереди безопасны для потоков и процессов. Давайте изменим наш предыдущий пример, чтобы добавить объект Queue и передать его нашим параллельным Processes:

from multiprocessing import Process, Queue
def writer(i,q):
    message = f'I am Process {i}'
    q.put(message)
if __name__ ==  '__main__':
    # Create multiprocessing queue
    q = Queue()
    
    # Create a group of parallel writers and start them
    for i in range(10):
        Process(target=writer, args=(i,q,)).start()
    # Read the queue sequentially
    for i in range(10):
        message = q.get()
        print(message)

Имейте в виду, что Queue.get() - это метод блокировки, поэтому мы не пропустим ни одного сообщения в этой очереди.

Следующим шагом в решении нашей проблемы является переход на параллельное чтение из очереди. Мы могли бы просто порождать процессы чтения так же, как мы порождали писателей, но это позволит запускать до 10 потоков параллельно. Что делать, если нас ограничивает меньшее количество читателей, как в исходном описании проблемы?

Бассейн

Введите multithreading.Pool():

«Объект пула процессов, который управляет пулом рабочих процессов, которым могут быть отправлены задания. Он поддерживает асинхронные результаты с таймаутами и обратными вызовами и имеет параллельную реализацию карты ».

Используя Pool, мы можем назначить столько параллельных процессов, сколько захотим, но в любой момент будет активным только количество потоков «процессов».

Посмотрим, как он себя поведет, если выбросить всех читателей в `Pool`:

from multiprocessing import Process, Queue, Pool
def writer(i,q):
    message = f’I am Process {i}’
    q.put(message)
def reader(i,q):
    message = q.get()
    print(message)
if __name__ == ‘__main__’:
    # Create multiprocessing queue
    q = Queue()
    # Create a group of parallel writers and start them
    for i in range(10):
        Process(target=writer, args=(i,q,)).start()
    # Create multiprocessing pool
    p = Pool(10)
    # Create a group of parallel readers and start them
    # Number of readers is matching the number of writers
    # However, the number of simultaneously running
    # readers is constrained to the pool size
    
    for i in range(10):
        p.apply_async(reader, (i,q,))

Однако, если мы запустим приведенный выше код, мы ничего не получим. Что случилось? Когда мы вызвали apply_async, выполнение кода сразу же продолжилось и, поскольку больше ничего не осталось в основной функции, завершилось. К счастью, multiprocessing справочник дает возможность дождаться результатов выполнения:

from multiprocessing import Process, Queue, Pool
def writer(i,q):
    message = f’I am Process {i}’
    q.put(message)
def reader(i,q):
    message = q.get()
    print(message)
if __name__ == ‘__main__’:
    # Create multiprocessing queue
    q = Queue()
    # Create a group of parallel writers and start them
    for i in range(10):
        Process(target=writer, args=(i,q,)).start()
    # Create multiprocessing pool
    p = Pool(10)
    # Create a group of parallel readers and start them
    # Number of readers is matching the number of writers
    # However, the number of simultaneously running
    # readers is constrained to the pool size
    readers = []
    for i in range(10):
        readers.append(p.apply_async(reader, (i,q,)))
    # Wait for the asynchrounous reader threads to finish
    [r.get() for r in readers]

На этот раз, если мы запустим код, мы получим следующую ошибку: RuntimeError: Queue objects should only be shared between processes through inheritance. multiprocessing.Manager позволит нам управлять очередью, а также сделать ее доступной для разных рабочих:

from multiprocessing import Process, Queue, Pool, Manager
def writer(i,q):
    message = f’I am Process {i}’
    q.put(message)
def reader(i,q):
    message = q.get()
    print(message)
if __name__ == ‘__main__’:
    # Create manager
    m = Manager()
    # Create multiprocessing queue
    q = m.Queue()
    # Create a group of parallel writers and start them
    for i in range(10):
        Process(target=writer, args=(i,q,)).start()
    # Create multiprocessing pool
    p = Pool(10)
    # Create a group of parallel readers and start them
    # Number of readers is matching the number of writers
    # However, the number of simultaneously running
    # readers is constrained to the pool size
    readers = []
    for i in range(10):
        readers.append(p.apply_async(reader, (i,q,)))
    # Wait for the asynchrounous reader threads to finish
    [r.get() for r in readers]

Наконец, мы можем получить ожидаемые результаты:

> python pl.py
I am Process 1
I am Process 4
I am Process 9
I am Process 8
I am Process 0
I am Process 5
I am Process 7
I am Process 2
I am Process 6
I am Process 3

Причуды, связанные с Windows

Сначала я начал работать над этой проблемой на машине под управлением Linux, но позже продолжил работу над Windows. К сожалению, многие вещи сработали не сразу. Вот что вам нужно знать:

1. Прерывание выполнения программы (Ctrl + C) не сработает сразу с приведенным выше кодом. Обходной путь - добавить рабочих инициализаторов:

def init_worker():
    """
    Pool worker initializer for keyboard interrupt on Windows
    """
    signal.signal(signal.SIGINT, signal.SIG_IGN)
p = Pool(num_readers, init_worker)

2. Мне не удалось запустить код в записной книжке Jupyter в Windows, если я не перенесу рабочие функции в отдельный .py файл и не импортирую их в свою записную книжку. В связи с этим вы не сможете запускать приведенные выше сценарии, не заключив основной код в if __name__ == ‘main':.

Конечный результат

В качестве завершающих штрихов добавим следующее:
• задержки для имитации работы, связанной с ЦП, для считывателя и записывающего устройства
• обработка исключений при ожидании завершения потоков считывателя
• настраиваемое количество модулей записи и считывания потоки
• некоторая документация по функциям

Вот окончательный результат:

from multiprocessing import Pool, Queue, Process, Manager
import random
import signal
import time
num_writers = 10
num_readers = 3
def writer(i,q):
    # Imitate CPU-bound work happening in writer
    delay = random.randint(1,10)
    time.sleep(delay)
    
    # Put the result into the queue
    t = time.time()
    print(f’I am writer {i}: {t}’)
    q.put(t)
def init_worker():
    """
    Pool worker initializer for keyboard interrupt on Windows
    """
    signal.signal(signal.SIGINT, signal.SIG_IGN)
def reader(i, q):
    """
    Queue reader worker
    """
    
    # Read the top message from the queue
    message = q.get()
    
    # Imitate CPU-bound work happening in reader
    time.sleep(3)
    print(f’I am reader {i}: {message}’)
if __name__ == ‘__main__’:
    # Create manager
    m = Manager()
    
    # Create multiprocessing queue
    q = m.Queue()
    # Create a group of parallel writers and start them
    for i in range(num_writers):
        Process(target=writer, args=(i,q,)).start()
    
    # Create multiprocessing pool
    p = Pool(num_readers, init_worker)
    # Create a group of parallel readers and start them
    # Number of readers is matching the number of writers
    # However, the number of simultaneously running
    # readers is constrained to the pool size
    readers = []
    for i in range(10):
        readers.append(p.apply_async(reader, (i,q,)))
    
    # Wait for the asynchrounous reader threads to finish
    try:
        [r.get() for r in readers]
    except:
        print(‘Interrupted’)
        p.terminate()
        p.join()

Если вы запустите его, вы получите примерно такой результат:

```

> python final.py
I am writer 8: 1580659076.783544
I am writer 3: 1580659076.783544
I am reader 0: 1580659076.783544
I am reader 1: 1580659076.783544
I am writer 7: 1580659079.7990372
I am writer 2: 1580659080.7971141
I am writer 1: 1580659081.785277
I am writer 4: 1580659082.7955923
I am reader 2: 1580659079.7990372
I am reader 3: 1580659080.7971141
I am writer 6: 1580659083.800029
I am writer 0: 1580659084.7862694
I am reader 4: 1580659081.785277
I am writer 9: 1580659085.7819643
I am writer 5: 1580659085.7919443
I am reader 5: 1580659082.7955923
I am reader 6: 1580659083.800029
I am reader 7: 1580659084.7862694
I am reader 8: 1580659085.7819643
I am reader 9: 1580659085.7919443

Этот пост изначально был опубликован в моем блоге.