Как описано в начале [1]: «Модуль Queue реализует очереди с несколькими производителями и несколькими потребителями. Это особенно полезно в многопоточном программировании, когда необходимо безопасно обмениваться информацией между несколькими потоками. Класс Queue в этом модуле реализует всю необходимую семантику блокировки». Поэтому реализация многопоточности с помощью Queue, на мой взгляд, весьма удобна и доступна для новичков.

Я настоятельно рекомендую ссылки [2][3] для лучшего понимания этой темы. Обратите внимание, что после Python 2.4 обычно рекомендуется модуль threading для многопоточного программирования вместо модуля thread в предыдущей версии. Не запутайтесь, когда сканируете Интернет в поисках информации.

Также обратите внимание на хорошо известную проблему Global Interpreter Lock (GIL) в CPython [4]. Мы обсудим это позже.

Многопоточность с Queue в Python

1. Модель производитель-потребитель

Допустим, вы хотите распараллелить фрагмент кода с помощью многопоточности. Вкратце основная идея такова: а) Создаем несколько воркеров (потребителей/потоков) и ждем поступления задач. б) Строим очередь, через которую основной поток (продюсер) «скармливает» задачи воркерам. c) Рабочие просто продолжают брать задачи одну за другой из очереди. г) Основной поток ждет, пока очередь не опустеет. Давайте сначала посмотрим, как может выглядеть пример, затем я объясню, как это работает. Приведенный ниже сценарий создает 2 потока с задачами простого вывода порядковых номеров. (Примечание: аналогичный пример можно найти в [1])

2. Определите задачу

Разница между обычной функцией Python и функциями, написанными для многопоточности с помощью очереди, заключается в том, что последняя принимает в качестве аргумента только одну очередь, а затем внутри функции извлекает из этой очереди нужные ей аргументы:

def foo(queue):
    while True:
        (argument) = queue.get()

Перед выходом из функции не забудьте отметить очередь, что эта задача выполнена и ее можно убрать из очереди:

    queue.task_done()

3. Создайте рабочих

Мы инициализируем поток с помощью threading.Thread с двумя параметрами target и args. target указывает имя функции, которую будет выполнять поток, а args назначает входной аргумент этой функции. Как мы уже говорили ранее, входной аргумент теперь представляет собой просто очередь. Вы можете представить работника, сидящего перед трубой и ожидающего, когда мы (основной поток) подадим настоящие входные данные.

worker = threading.Thread(target=foo, args=(myQueue,))

Теперь важно назначить этого работника демоном. Основное различие в поведении между потоками демона и не-демона заключается в том, что первый завершается вместе с основным потоком, а второй — нет [5].

worker.daemon = True

Конечно, не забудьте начать тему:

worker.start()

4. Очередь и жди

После того, как все готово, теперь мы просто подаем аргументы в очередь:

myQueue.put((......))

, и подождите, пока очередь не опустеет (т.е. все задачи будут выполнены) по потокам:

myQueue.join()

Обсуждение

Остановка потока

Уничтожение или завершение потока в Python не рекомендуется и не так просто, как вы могли ожидать [6]. Однако по-прежнему можно остановить поток, поместив флаг threading.Event() в аргумент функции задачи в дополнение к очереди и установив флаг при необходимости [7]:

Однако обратите внимание, что если после остановки потока вы снова начнете отправлять входные данные в эту очередь, остановленный поток «пробуждается» и начинает выполнять задачи.

Глобальная блокировка интерпретатора (GIL)

GIL — это семафороподобный механизм в CPython. Несмотря на то, что он гарантирует безопасность потоков, накладные расходы на сигнализацию фактически заставляют программы работать медленнее с многопоточным, чем с однопоточным, и медленнее на многоядерном компьютере, чем с одноядерным, особенно для программ, ограниченных ЦП. [8]

Итак, суть в том, что если вы просто работаете с программами, ограниченными вводом-выводом, и не пишете расширения C, то все должно быть в порядке [4]; в противном случае вам может потребоваться отключить GIL или просто переключиться на модуль multiprocessing. [9]

Похожие сообщения

В Интернете есть множество примеров, когда люди оборачивают объект потока еще одним самоопределяемым классом потока. См. примеры в [10][11]. Также см. [12] для более подробного объяснения многопоточности/многопроцессорной обработки в Python.

использованная литература

[1] 8.10. Queue — класс синхронизированной очереди
https://docs.python.org/2/library/queue.html

[2] threading — Управление параллельными потоками http://pymotw.com/2/threading/

[3] Очередь — реализация потокобезопасного FIFO http://pymotw.com/2/Queue/

[4] Что такое глобальная блокировка интерпретатора (GIL)? http://stackoverflow.com/questions/1294382/what-is-a-global-interpreter-lock-gil

[5] многопоточность — свойство демона потока Python http://stackoverflow.com/questions/4330111/python-thread-daemon-property

[6] Есть ли способ убить поток в Python? http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python

[7] python — остановка потока через определенное время http://stackoverflow.com/questions/6524459/stopping-a-thread-after-a-certain-amount-of-time

[8] Внутри Python GIL, Дэвид Бизли.

видео: https://www.youtube.com/watch?v=ph374fJqFPE

слайды: http://www.dabeaz.com/python/GIL.pdf

[9] 16.6. multiprocessing — Многопоточный интерфейс на основе процессов https://docs.python.org/2/library/multiprocessing.html

[10] Практическое многопоточное программирование на Python http://www.ibm.com/developerworks/aix/library/au-threadingpython/

[11] Многопоточное программирование на Python http://www.tutorialspoint.com/python/python_multithreading.htm

[12] Параллелизм в Python от Mosky https://speakerdeck.com/mosky/concurrency-in-python

Первоначально опубликовано на ybdarrenwang.blogspot.com.