В этой статье будет обсуждаться диспетчеризация событий по потокам в python 2, используя в качестве рабочего примера утилиту параллельной загрузки файлов. Прежде чем мы углубимся в это, давайте поговорим пару минут о Python 2 и потоках в целом. Упомяните эти две вещи вместе в предложении, и многие разработчики ответят недоумением. Python не является многопоточным, - говорят они, заканчивая вызовом ужасного GIL или Global Interpreter Lock. Это неверно. Python многопоточен. То, чем питон не является, является параллельным ̶c̶o̶n̶c̶u̶r̶r̶e̶n̶t̶ (спасибо Томашу П. за то, что он прямо указал мне на разницу, см. Его ответ ниже). Есть разница.

Python может запускать множество потоков, но из-за GIL интерпретатор python может запускать только один поток за раз. Это означает, что если поток хочет выполнить работу, он должен получить блокировку интерпретатора, что он может сделать только тогда, когда другой поток еще не имеет ее. GIL - это примитив синхронизации, который сериализует запросы к интерпретатору из разных потоков. Таким образом, разделение работы на несколько потоков на самом деле не ускоряет ее. Мы можем ясно увидеть это на простом примере:

import numpy.random
def sort_arrays(a1, a2):
  sorted(a1)
  sorted(a2)
test_arr_1 = numpy.random.randint(0,high=1000,size=1000000)
test_arr_2 = numpy.random.randint(0,high=1000,size=1000000)
%timeit sort_arrays(test_arr_1, test_arr_2)

1 loops, best of 3: 858 ms per loop

Этот фрагмент просто создает два массива по миллиону случайных целых чисел в каждом, а затем сортирует их. Таймер сообщает о наилучшем из трех моментов времени 858 мс для выполнения сортировки. Глядя на использование ядра с помощью mpstat во время работы программы, мы можем получить картину использования процессора:

04:20:15 PM CPU %usr ... %idle
04:20:16 PM all 26.18 ... 72.82
04:20:16 PM 0 3.00 ... 97.00
04:20:16 PM 1 97.00 ... 0.00
04:20:16 PM 2 2.00 ... 98.00
04:20:16 PM 3 2.00 ... 98.00

Ядро 1 почти полностью насыщено, но у нас есть три полных ядра, которые почти бездействуют. Бьюсь об заклад, мы сможем отсортировать эти массивы намного быстрее!

import numpy.random
from threading import Thread
def sort_arrays_t(a1, a2):
  def sort_array(a):
    sorted(a)
 
  t = Thread(target=lambda: sort_array(a2))
  t.start()
  sorted(a1)
  t.join()
  return
test_arr_1 = numpy.random.randint(0,high=1000,size=1000000)
test_arr_2 = numpy.random.randint(0,high=1000,size=1000000)
%timeit sort_arrays_t(test_arr_1, test_arr_2)
1 loops, best of 3: 892 ms per loop

Ой. Это не помогло. Давайте посмотрим на основное использование во время «параллельной» сортировки:

12:54:12 AM CPU %usr ... %idle
12:54:13 AM all 24.37 ... 74.12
12:54:13 AM 0 2.02 ... 95.96
12:54:13 AM 1 50.50 ... 47.52
12:54:13 AM 2 40.38 ... 58.65
12:54:13 AM 3 2.04 ... 95.92

Что, черт возьми, здесь происходит? В параллельной версии используются два ядра примерно с половиной мощности, и общее время работы ниже, чем в последовательной версии. Причина этого довольно проста: python делегирует планирование потоков операционной системе хоста. Это означает, что хост, в данном случае linux, может свободно следовать своему обычному алгоритму планирования, и это приводит к тому, что каждый из этих активных потоков назначается другому незанятому ядру. Однако для выполнения байт-кода Python каждый поток должен сначала получить блокировку интерпретатора. Это эффективно ограничивает работу двух потоков, так что одновременно может работать только один. Если бы мы создали третий, мы бы увидели, что каждый поток теперь получает 33% процессорного времени и так далее. Что еще хуже, получение GIL - это дорогостоящая операция сама по себе, поэтому в целом программа работает медленнее с использованием потоков. Спасибо Global Interpreter Lock!

Python многопоточен. То, чем не является питон, является параллельным. Есть разница.

Очевидно, что python не является параллельным. Если бы это было так, мы бы ожидали, что процесс займет примерно половину времени в многоядерной системе, где переключение контекста не требуется. Значит ли это, что несколько потоков не имеют значения в Python? Это не. Есть одна конкретная ситуация, когда несколько потоков действительно очень ценны, и это когда нам нужно дождаться длительной операции ввода-вывода, по сути, ничего не делая, пока она не завершится. Рассмотрим следующий пример:

В момент времени 0 на этом графике основной поток получает запрос через сетевое соединение. Он обращается к уровню доступа к данным, который, в свою очередь, запрашивает базу данных, используя блокирующий вызов. Соединения 2 и 3 не могут обслуживаться до тех пор, пока этот блокирующий вызов не вернется и не будет отправлен ответ по соединению 1. Это обычно считается плохим, и разработчики, работающие над серверным программным обеспечением, будут всеми силами стремиться избежать этой ситуации. Один из способов избежать этого - создать новый поток для каждого соединения (или, как правило, назначить поток из управляемого пула).

Несмотря на то, что python не является параллельным и не позволяет этим потокам выполняться одновременно, реальность такова, что они проводят большую часть своего времени в ожидании в базе данных, и пока они заблокированы в ожидании ответа, они дают и позволяют другим потокам запускаться. Таким образом, эта стратегия определенно увеличивает количество подключений, которые могут обрабатываться одновременно, и является подходом, используемым основными демонами http, такими как apache и пакетами http-сервера python, такими как flask.

Однако есть проблема с такой архитектурой многопоточного сервера: потоки в Linux имеют изрядные накладные расходы. Эти накладные расходы включают в себя память, которая должна быть выделена для стека потока и любого локального хранилища потока. По некоторым подсчетам это может быть до 8 МБ ОЗУ, выделенной на поток. Это не очень много, если все, что вам нужно, - это несколько подключений. Масштабируйте это до тысяч подключений, и это станет проблемой. При масштабной эксплуатации серверов необходимый объем оперативной памяти очень сильно влияет на затраты. Вместо того, чтобы иметь все эти дорогостоящие потоки, ожидающие подключения к базам данных или другим ресурсам, было бы неплохо, если бы мы могли использовать только один поток, отправляя запросы, а затем возвращаясь для обработки результатов, когда они будут доступны?

Этот шаблон часто называют однопоточным вводом-выводом, управляемым событиями. Он однопоточный, потому что использует один поток. Он управляется событиями, потому что после запуска длинного, блокирующего запроса ввода-вывода поток обработки переходит к обработке других запросов, возвращаясь для обработки результатов каждого только тогда, когда они доступны. Этот бит когда они доступны является частью события. Этот вид программирования также называется асинхронным, потому что отдельные этапы обработки, необходимые для выполнения запроса, не обязательно происходят в последовательном порядке. Это основная модель высокопроизводительных сетевых библиотек, таких как twisted, tornado и node.js.

Это возвращает нас к названию этой статьи. Чтобы построить пример вышеприведенного шаблона, мы создадим очень простой параллельный загрузчик файлов. Чтобы он заработал, нам понадобятся три вещи:

  1. Основной поток для имитации сервера. Мы не будем обрабатывать входящие соединения. Вместо этого мы возьмем ввод из текстового файла, чтобы не усложнять его.
  2. По крайней мере, один рабочий поток, запущенный основным потоком, который выполняет фактическую загрузку. Это имитирует длительный блокирующий ввод-вывод базы данных на иллюстрациях выше.
  3. Некоторые средства передачи результатов рабочего потока обратно в основной поток после завершения ввода-вывода. Результаты могут означать, что файл успешно загружен, или это может означать ошибку.

Чтобы передать данные в наш основной поток, мы возьмем строки из текстового файла:

$ cat test_input.txt
http://download.geonames.org/export/dump/CN.zip
http://download.geonames.org/export/dump/ID.zip
http://download.geonames.org/export/dump/IN.zip
http://download.geonames.org/export/dump/IR.zip
http://download.geonames.org/export/dump/NO.zip
$

Эти URL-адреса указывают на некоторые файлы на geonames.org, размер которых подходит для демонстрации. Если вы не знаете Geonames.org, это отличная организация, которая бесплатно предоставляет много отличной географической информации. Следующий код читает файл и немного подготавливает его содержимое:

input_file = u"test_input.txt"
if __name__ == “__main__”:
  # open the input file, read the urls, for each clean up the
  # text, determine the local filename, etc.
  with open(input_file, “r”) as f:
    for url in f:
      clean_url = url.strip()
      short_url = ‘/’.join(clean_url.split(‘/’)[:-1])
      local_name = clean_url.split(‘/’)[-1]
      print “Downloading {} from {} …”.format(local_name, short_url)

К этому мы должны добавить три вещи: некоторый код для запуска рабочих потоков, место для получения от них событий и цикл для обработки этих событий по мере их поступления. Вот завершенный код для основного потока:

from Queue import Queue
import threading
if __name__ == “__main__”:
  # a queue to receive events for the main thread
  msg_queue = Queue()
# track the thread count against this baseline
  thread_count = threading.active_count()
# open the input file, read the urls, for each clean up the
# text, determine the local filname, and then kick off a worker
  with open(input_file, “r”) as f:
    for url in f:
      clean_url = url.strip()
      short_url = ‘/’.join(clean_url.split(‘/’)[:-1])
      local_name = clean_url.split(‘/’)[-1]
      print “Downloading {} from {} …”.format(local_name, short_url)
      t = threading.Thread(target=download, args=(clean_url,
        local_name, msg_queue))
      t.start()
# keep processing events in the message queue until the thread
# count returns to the baseline
  while threading.active_count() > thread_count:
    event = msg_queue.get()
    event()
print “All downloads completed!”

Первое, что мы делаем, это создаем экземпляр типа Queue и сохраняем его в переменной msg_queue. Очереди - это потокобезопасная структура данных Python, реализующая эксклюзивную очередь FIFO. Только один поток может получить доступ к структуре одновременно, и, поскольку он может хранить любой объект python, он может хранить вызываемые объекты, что будет важно, когда мы дойдем до отправки в него событий.

Затем мы получаем текущее количество активных потоков и сохраняем его в переменной для дальнейшего использования. Мы будем использовать его для координации нашего основного потока, чтобы он продолжал обрабатывать события, пока жив любой из его рабочих потоков. Есть другие и, вероятно, более надежные способы сделать это, но здесь работает простой подход.

В цикл обработки URL-адресов, который был представлен выше, мы добавили код для создания нового потока (тип threading.Thread). В аргумент «target» конструктора мы передаем имя метода (пока не написано), который мы хотим, чтобы он вызывал с новым потоком, в этом случае мы назовем его «загрузка». Мы также передаем набор аргументов. Это URL-адрес, который мы хотим загрузить, имя локального файла, в который мы хотим записать контент, и ссылка на очередь, которую мы создали выше. Затем вызывается метод Thread.start (), который вызывает создание потока с нашим методом загрузки в качестве точки входа. В этом простом цикле мы создаем один поток для каждого URL-адреса в файле, что означает, что если кто-то поместит 10 000 URL-адресов во входной файл, мы получим 10 000 потоков. На практике мы использовали бы управляемый пул потоков.

Есть одна конкретная ситуация, когда несколько потоков действительно очень важны, и это когда нам нужно дождаться длительной операции ввода-вывода ...

Когда потоки созданы и предположительно работают, основной поток переходит в цикл, который выполняется до тех пор, пока количество активных потоков не вернется к тому значению, которое было при запуске. Все, что поток делает в этом цикле, - это вызывает блокирующий метод Queue.get () для получения объекта, который он обрабатывает как вызываемый, а затем вызывает. Это основной «цикл событий» в нашем примере. Теперь давайте создадим рабочий метод, который будет выполнять загрузку и отправку событий в очередь:

from contextlib import closing
import requests
def download(url, local_name, msg_queue):
  “””
  The download worker entrypoint.
  “””
  try:
    with closing(requests.get(url, stream=True)) as response:
      if not response.ok:
        response.raise_for_status()
      with open(local_name, ‘wb’) as local_file:
        for chunk in response.iter_content(chunk_size=1024):
          if chunk:
            local_file.write(chunk)
  except Exception as e:
    msg_queue.put(lambda: on_download_error(url, e.message))
  else:
    msg_queue.put(lambda: on_download_complete(url, local_name))

Прежде всего следует отметить, что сигнатура этого метода соответствует имени цели и аргументам, которые мы передали методу Thread.start () выше. Во-вторых, следует отметить, что мы используем пакет запросы, который представляет собой действительно простую в использовании клиентскую библиотеку http. Он выполнит загрузку за нас. Тип contextlib.closing используется для создания замыкания вокруг вызова request.get, что помогает убедиться, что соединение освобождается при потоковых загрузках, подобных тем, которые мы выполняем. Внутри этого цикла мы просто перебираем блоки данных, возвращаемые в ответе, и записываем их в файл. Если запрос возвращает код состояния http, отличный от OK, мы вызываем это в виде исключения.

Все это заключено в блок try, и именно в обработчиках блока try происходит фактическая отправка событий. Событие отправляется в основной поток путем помещения вызываемого объекта в очередь для его обработки. В этом случае мы определили два «события» как результаты процесса загрузки: on_download_complete и on_download_error. В обоих случаях мы используем лямбда, чтобы создать замыкание вокруг вызываемого объекта и аргументов, которые мы хотим передать ему, иначе метод будет выполнен, а его результат помещен в очередь, а это не то, что мы хотим. Последнее, что нам нужно сделать, это определить два обработчика событий:

def on_download_complete(url, file_name):
  “””
  Callback to be fired on the main thread when the download has
  completed.
  “””
  print “{} completed!”.format(file_name)
def on_download_error(url, error):
  “””
  Callback to be fired on the main thread in case of error.
  “””
  print “{} failed due to {}”.format(url, error)

Они ничего не делают, кроме распечатки некоторых деталей события, чтобы мы могли видеть, что происходит. На этом наш элементарный параллельный загрузчик готов. Вы можете просмотреть полный исходный код Python на github. Итак, давайте запустим его и посмотрим, что произойдет:

$ python download.py
Downloading CN.zip from http://download.geonames.org/export/dump …
Downloading ID.zip from http://download.geonames.org/export/dump …
Downloading IN.zip from http://download.geonames.org/export/dump …
Downloading IR.zip from http://download.geonames.org/export/dump …
Downloading NO.zip from http://download.geonames.org/export/dump …
ID.zip completed!
IR.zip completed!
NO.zip completed!
IN.zip completed!
CN.zip completed!
All downloads completed!
$ ll *.zip
-rw-rw-r — 1 mark mark 20774626 Apr 11 15:56 CN.zip
-rw-rw-r — 1 mark mark 8433991 Apr 11 15:55 ID.zip
-rw-rw-r — 1 mark mark 12322642 Apr 11 15:55 IN.zip
-rw-rw-r — 1 mark mark 10570624 Apr 11 15:55 IR.zip
-rw-rw-r — 1 mark mark 15368628 Apr 11 15:55 NO.zip

Хотя этот пример чрезвычайно прост, тот же самый базовый шаблон и методы используются в высокопроизводительных сетевых серверах, не говоря уже о каждой когда-либо разработанной среде графического интерфейса. Одна из причин, по которой этот шаблон так часто встречается во фреймворках, заключается в том, что, как вы можете видеть, он требует сотрудничества между основным потоком, который обрабатывает события в цикле, и рабочими потоками, которые передают события в основной поток через очередь. Было бы нелегко прикрепить асинхронный обработчик для вызова базы данных, например, к приложению, в котором вы не могли контролировать, что делает основной поток. Это можно сделать, потенциально используя таймеры или сигналы, чтобы прервать основной поток и заставить его обрабатывать события, но это намного более навязчиво и рискованно, чем совместная работа всех частей.