Эта статья посвящена созданию асинхронных микросервисов. Я сравню, как этого можно достичь в Javascript и Erlang изначально, и в Python с использованием RabbitMQ и Celery.

Но почему?

Мое первое знакомство с асинхронным программированием на Python было при создании веб-сервера. После совершения покупки пользователь должен в конечном итоге получить счет в формате PDF по электронной почте. Это не должно происходить сразу во время запроса; на самом деле, было бы лучше, чтобы этого не произошло, чтобы без нужды не замедлять покупку. В то время я не знал, как реализовать асинхронный рабочий процесс на Python, но быстрый поиск в Google быстро привел меня к Celery и RabbitMQ. Сельдерей очень удобен в употреблении; единственная боль - это настройка брокера сообщений - в моем случае RabbitMQ. После настройки запускать задачу в фоновом режиме так же просто, как писать: myapp.py,

from celery import Celery 
app = Celery('myapp', broker='amqp://localhost:5672') 
# create celery 'app', takes as input the URL to RabbitMQ 
@app.task() def my_function(value):
    print("Print {} asynchronously".format(value)) 
if __name__ == "__main__": 
    my_function.delay("me") 
    # runs print("Print me asynchronously") asynchronously!

… И запустив воркер, используя команду

celery -A myapp worker

Затем запуск python myapp.py будет запускать my_function асинхронно в только что запущенном рабочем процессе.

Вот что я сделал с моим кодом выставления счетов вместо my_function, и я успешно реализовал остальную часть серверной части. Однако это заставило меня задуматься, как можно и нужно использовать сельдерей. Я знал, что RabbitMQ реализовал очередь, в которой будут храниться сообщения, и что работник сельдерея извлекал и обрабатывал сообщения в этой очереди одно за другим, пока они не закончились. Но как еще можно использовать сельдерей и rabbitmq? И как асинхронное программирование работает в других языках и в других контекстах?

Электронная почта Cruncher

Я проиллюстрирую микросервисы и асинхронное программирование на javascript, erlang и python на простом примере - email cruncher. Вы можете вставить конфиденциальное сообщение в текстовое поле, и обработчик электронной почты сообщит вам, сколько писем он нашел в нем!

Код для этого здесь:

Архитектура очень простая. Всего есть 3 микросервиса:

  • загрузчик, который считывает данные из файла или текстового поля и передает их
  • cruncher, который находит электронные письма в данных с помощью регулярного выражения и подсчитывает их, передавая счет в
  • средство отображения статуса, которое ведет подсчет общего количества когда-либо найденных электронных писем и отображает его пользователю.

У каждого из этих микросервисов должен быть свой собственный «почтовый ящик», очередь для входящих сообщений, чтобы обрабатывать их одно за другим.

Так что не забывайте об этом примере, пока мы идем дальше 😄

JavaScript

Javascript - очевидный пример асинхронного программирования. Большинство современных веб-сайтов активно используют JavaScript AJAX, что означает асинхронный javascript и xml. AJAX - это способ для пользователя выполнить запрос к внешнему серверу без зависания веб-сайта - конкретный пример «асинхронного ввода-вывода». В коде это реализовано с помощью обратных вызовов. Вместо того, чтобы писать код, который делает запрос, ожидает ответа и затем что-то с ним делает, вы вместо этого пишете код, который делает запрос и указывает, какая функция обратного вызова должна быть запущена при возврате запроса. Пример:

var xmlhttp = new XMLHttpRequest(); 
xmlhttp.onreadystatechange = function() { 
  console.log("Execute some code"); 
}; 
xmlhttp.open("GET", "my_resource.html", true); 
xmlhttp.send();

Что это значит? Сначала мы устанавливаем функцию обратного вызова, которая запускается при возникновении события onreadystatechange, а затем запускаем фактический запрос. Функция обратного вызова будет запущена, когда запрос завершится и произойдет событие onreadystatechange.

Но что за событие? Событие - это полезная абстракция javascript, реализация которой зависит от браузера. Все, что нам нужно знать, это то, что существует целый ряд «событий», к которым мы можем прикреплять обратные вызовы. Наиболее важные из них: изменения в модели DOM из-за взаимодействия с пользователем и ответы серверов. Любое такое реальное событие запускает событие javascript. Детали реализации не должны нас сейчас беспокоить - вместо этого, возникает интересный вопрос, учитывая, что мы можем прикреплять функции обратного вызова к событиям, как и когда именно эти обратные вызовы запускаются? Все ли они выполняются в одном процессе или потоке и в какое время? В каком порядке выполняются обратные вызовы после их запуска и кто отвечает за их запуск?

Как уже упоминалось, события представляют собой полезную абстракцию для того, что «что-то происходит» в нашем браузере или на вкладке браузера. Браузер обнаруживает, например, что запрос вернулся или что пользователь наводит указатель мыши на определенную кнопку. Это соответствует событию с определенным именем, например onchange или onmouseover. Если в нашем коде javascript мы установили обратный вызов для одного из этих событий, браузер запускает соответствующий код. Так что же, если одновременно или почти одновременно происходит много событий? Что произойдет, если во время выполнения одного обратного вызова произойдет другое событие? Обрабатываются ли они одновременно с использованием потоков или один за другим и в каком порядке?

Оказывается, в javascript есть отдельная очередь сообщений, в которую ставятся обратные вызовы. Любой код javascript, который вы пишете, происходит в одном потоке. Этот поток реализует цикл обработки событий: он ожидает следующего сообщения в очереди, а затем обрабатывает его.

while (queue.waitForMessage()) { 
  queue.processNextMessage(); 
}

В частности, всякий раз, когда происходит событие javascript, соответствующий обратный вызов добавляется в очередь сообщений, если он определен.

Одно из следствий этого состоит в том, что никакие две функции обратного вызова, которые вы определяете, никогда не выполняются одновременно. Допустим, вы определяете функцию modifyState, которая увеличивает некоторое глобальное состояние counter:

var counter = 0; // global variable 
var mycallback = function(e) { 
  console.log('Value of counter is: ' + counter.toString()); 
  counter += 1; 
  console.log('Value of counter is: ' + counter.toString()); 
} 
var mousebutton = document.getElementById("my-button"); mousebutton.addEventListener("mouseover", mycallback);

Тогда вы можете быть уверены, что при наведении курсора на my-button консоль будет печатать N, а затем N + 1 по порядку - что бы ни случилось. В более общем смысле, ваш обратный вызов может предполагать, что состояние вашей программы не будет изменено никаким другим обратным вызовом.

Множественные среды выполнения

Каждый iframe в вашем HTML или javascript worker имеет свою собственную очередь сообщений, стек и кучу. Это означает, что мы можем реализовать параллельные «микросервисы» в javascript, запустив новых рабочих процессов javascript.

var my_worker = new Worker('my-worker.js') //spin up a new worker 
// could also be an iframe, e.g. 
// var my_iframe = document.getElementById('my-iframe'); 
var my_message = {message: 'MyMessage'} my_worker.postMessage(my_message) // post message to worker

Email Cruncher в Javascript

Время для электронной почты! Теперь мы знаем, что каждая среда выполнения javascript - включая основной скрипт и каждого порождаемого им рабочего - имеет свою собственную очередь сообщений. Таким образом, мы можем попытаться реализовать архитектуру почтового обработчика с использованием отдельного воркера для микросервиса "cruncher". Всего у нас будет 2 микросервиса, каждый со своей очередью:

  • Основной микросервис в «main.js», отвечающий за чтение «загруженных» файлов и обновление DOM обновленным общим количеством найденных писем.
  • Микросервис cruncher в «email_worker.js», который принимает на входе строку, находит в ней электронные письма с помощью регулярного выражения и выводит результат.

Я реализовал это в https://github.com/egeromin/async-experiments/tree/master/javascript. Ознакомьтесь с README для получения инструкций по запуску.

Архитектура на самом деле немного отличается от изначально запланированной, поскольку мы «объединили» 2 микросервиса, загрузчик и индикатор состояния. Вместо этого это выглядит так:

Код для подсчета общего количества найденных писем - это обратный вызов события onmessage в main.js. Он считывает текущее количество писем из DOM, добавляет количество новых писем, объявленное в сообщении, а затем обновляет DOM. Обратите внимание, что этот обратный вызов использует тот факт, что в javascript одновременно выполняется только 1 обратный вызов. В противном случае это не было бы надежным способом обновления общего количества писем, поскольку общее количество в DOM могло измениться из-за другого экземпляра обратного вызова в каком-то другом потоке. Однако, опять же, это не проблема, поскольку одновременно выполняется только один обратный вызов, и поэтому условий гонки нет.

Erlang

Erlang - это язык для программирования приложений с высокой степенью параллелизма и асинхронности. По данным http://www.erlang.org,

Erlang - это язык программирования, используемый для создания масштабируемых программных систем реального времени с требованиями высокой доступности. Некоторые из его применений - в телекоммуникациях, банковском деле, электронной коммерции, компьютерной телефонии и обмене мгновенными сообщениями. Система времени выполнения Erlang имеет встроенную поддержку параллелизма, распределения и отказоустойчивости.

Бэкэнд WhatsApp написан на Erlang!

Erlang имеет ряд интересных особенностей:

  • Однократное размещение. В функции вы можете присвоить значение переменной только один раз. Это значительно упрощает представление о состоянии, поскольку значение переменной не может быть изменено после того, как оно было присвоено.
  • Нет для петель. Как и во многих других языках функционального программирования, здесь нет циклов for, поэтому для реализации циклов необходимо использовать рекурсию. Это заставляет вас думать и разрабатывать свою программу в терминах рекурсии.

Основная причина, по которой я исследую это здесь, заключается в том, что очень легко создавать новые «подпроцессы Erlang». Это не настоящие подпроцессы ОС, поскольку erlang работает внутри виртуальной машины под названием BEAM. Эта виртуальная машина использует потоки, но не обязательно по одному потоку на каждый подпроцесс erlang, поскольку именно виртуальная машина планирует задачи, а не базовая ОС. Джо Армстронг, создатель Erlang, объясняет процессы Erlang:

  • Все есть процесс.
  • Процессы сильно изолированы.
  • Создание и уничтожение процесса - это легкая операция.
  • Передача сообщений - единственный способ взаимодействия процессов.
  • У процессов есть уникальные имена.
  • Если вы знаете название процесса, вы можете отправить ему сообщение.
  • У процессов нет общих ресурсов.
  • Обработка ошибок нелокальная.
  • Процессы делают то, что должны, или терпят неудачу.

См. Более подробную информацию в Кандидатской диссертации Джо Армстронга.

Так, в частности, каждый подпроцесс Erlang имеет свой собственный «почтовый ящик»: очередь, в которую мы можем отправлять сообщения, которая может быть любым объектом Erlang или term.

Message_pid ! {a_message, "Message string"}, 
% Send a message to the process with id `Message_pid`

Email Cruncher на Erlang

Это означает, что Erlang идеально подходит для написания версии нашего почтового обработчика! Я реализовал версию обработчика электронной почты для командной строки с использованием erlang в https://github.com/egeromin/async-experiments/tree/master/erlang. Взгляните 😉. Стоит отметить несколько моментов:

  • Микросервисы сразу узнаваемы по коду: prompt, emailExtractor и resultDisplayer.
  • Чтобы реализовать бесконечный цикл, я должен использовать рекурсию во всех трех микросервисах.
  • Сопоставление с шаблоном используется для определения того, какое действие я должен выполнить в микросервисе. На данный момент есть только 2 варианта: process или quit. Идея использования атомов для обозначения «типа» сообщения является распространенным шаблоном в Erlang.
  • Из-за единственного присваивания Erlang невозможно сохранить глобальное состояние в переменной. Чтобы сохранить промежуточную сумму в resultDisplayer, я должен использовать рекурсию.

Полезные ссылки:

Интерлюдия: AMQP и RabbitMQ

Теперь, когда мы изучили асинхронное программирование и микросервисы на javascript и erlang, которые изначально поддерживают передачу сообщений и очереди, как насчет других языков, таких как python, которые не имеют этой функции?

Одно из решений - использовать брокер сообщений. Посредники сообщений - это программное обеспечение, специально предназначенное для отправки сообщений между различными процессами / программами. Вместо того, чтобы отправлять сообщения другим процессам напрямую через сокеты или каналы, вы отправляете их брокеру, который действует как посредник. Преимущество использования брокера сообщений заключается в том, что вам больше не нужно беспокоиться о деталях отправки сообщений, таких как постановка в очередь и маршрутизация, и вы можете делегировать эти задачи брокеру. Например, вы можете указать брокеру отправить сообщение в определенную очередь или набор очередей. Обычно брокеры сообщений реализованы как серверы, с которыми вы можете взаимодействовать через TCP, используя настраиваемый протокол.

Существует много разных брокеров сообщений, и я ограничусь кратким обсуждением только одного: RabbitMQ, который реализует AMQP, Расширенный протокол очереди сообщений. Лучшее введение в это - отличный набор руководств RabbitMQ, которые я настоятельно рекомендую прочитать. Вот краткое изложение:

Очереди, производители и потребители

RabbitMQ реализует очереди. Новые очереди должны быть объявлены с помощью вызова API AMQP. Очереди можно идентифицировать с помощью ключа привязки, который используется для маршрутизации с использованием обменов (подробнее ниже). Производители могут помещать сообщения в очередь, а потребители могут потреблять их по одному. И производство, и потребление соответствуют определенным действиям в протоколе AMQP. Очереди хранятся в памяти, но могут сохраняться на диске, если они должным образом объявлены.

Биржи

На самом деле продюсеры не публикуют напрямую в очереди. Вместо этого они публикуются на биржах, которые отвечают за маршрутизацию. Обмены отличаются от очередей тем, что в них нет памяти. Их роль - просто направлять сообщения в очереди на основе ключа маршрутизации. Если к конкретному обмену не прикреплены очереди, то любые сообщения, опубликованные для этого обмена, будут просто потеряны.

В AMQP существует 3 основных типа бирж:

  • fanout - сообщение передается в каждую очередь, прикрепленную к этому обмену.
  • direct - сообщение передается в очередь, ключ привязки которой точно совпадает с ключом маршрутизации сообщения.
  • тема - сообщение передается во все очереди, шаблон ключей привязки которых совпадает с ключом маршрутизации сообщения. Например, шаблон media.* соответствует как media.video, так и media.audio.

Биржи позволяют нам реализовать публикацию / подписку, при которой ряд различных служб подписывается на конкретную биржу и выбирают получение сообщений от нее либо выборочно, либо в в случае прямых и тематических обменов или без разбора, как в случае разветвленных обменов.

Причуда предварительной загрузки

По умолчанию очередь выгружает свои сообщения каждому потребителю, который в настоящее время на нее подписан, циклически - до того, как потребитель завершит обработку потребляемой задачи! Это связано с тем, что сообщения отправляются потребителям немедленно, что означает, что они зарезервированы для этого конкретного потребителя, даже если он еще не завершил обработку сообщения. Хотя это гарантирует, что каждый потребитель получает примерно одинаковое количество задач, может возникнуть проблема, если некоторые задачи занимают гораздо больше времени для обработки, чем другие. В этом случае, хотя количество сообщений, полученных каждым исполнителем, будет примерно одинаковым, некоторые рабочие могут в конечном итоге выполнять гораздо больше работы, чем другие. Чтобы воркер получил новое сообщение только тогда, когда оно будет готово, вы должны отключить префинирование.

RPC

RabbitMQ можно использовать для выполнения синхронных вызовов с использованием RPC, что означает удаленные вызовы процедур. Идея состоит в том, чтобы опубликовать задачу для конкретной задачи и указать очередь ответа, то есть очередь, в которую должен быть отправлен результат. Затем вызывающий должен дождаться результата, используя данные из этой очереди ответов. Чтобы увидеть, как это реализовано в Python с использованием pika, ознакомьтесь с этим руководством.

Фактический протокол

Одна из вещей, которые меня интересовали при изучении AMQP, - это как выглядит реальный протокол? Например, HTTP и SMTP являются относительно простыми текстовыми протоколами. RabbitMQ немного сложнее. Если вы чувствуете себя смелым, сразу погрузитесь в официальную спецификацию! В качестве альтернативы, вот суть:

  • AMQP - это бинарный протокол. Двоичные данные отправляются по TCP в единицах, называемых фреймами.
  • Спецификация состоит из 2 частей:
  • «функциональный уровень», который определяет поддерживаемые функциональные возможности в терминах классов и методов. Например, спецификация определяет Queue.Declare метод объявления очередей.
  • «транспортный уровень», который определяет, как преобразовывать эти вызовы методов в кадры двоичных данных.

Для конкретной реализации спецификации ознакомьтесь с библиотекой python pika: https://github.com/pika/pika/blob/master/pika/spec.py

AMQP в Python с использованием Комбу

Я буду использовать Комбу, чтобы использовать RabbitMQ в Python. Несмотря на то, что команда RabbitMQ рекомендовала pika, я выбрал вместо этого Kombu, потому что он используется в Celery. Однако некоторые функции AMQP, доступные в pika, похоже, недоступны в Kombu. Например, я не смог найти способ, чтобы сервер RabbitMQ выбирал случайное имя очереди, что возможно в pika, объявив очередь без имени: channel.queue_declare(). Сделав это предупреждение, вот очень быстрое введение в Комбу.

Пример объявления прямого обмена и публикации на этом обмене

from kombu import Connection, Exchange 
if __name__ == "__main__": 
    rabbit_url = "amqp://username:password@localhost:5778/vhost" 
    exchange = Exchange('my-exchange', type='direct') 
    with Connection(rabbit_url) as connection: 
        producer = connection.Producer((
        producer.publish("some message!", exchange=email_exchange, 
                         declare=[email_exchange], retry=True)

Пример объявления обмена, очереди и потребления из этой очереди:

from kombu import Connection, Exchange, Queue

def message_callback(content, message):
    print("Message content: {}".format(content))  
    # callback to run when we receive a message

if __name__ == "__main__":
    rabbit_url = "amqp://username:password@localhost:5778/vhost"
    with Connection(rabbit_url) as connection:
        exchange = Exchange('my-exchange', type='direct')
        queue = Queue('my-queue', exchange=exchange)
        with Consumer(
            connection,
            queues=[queue],
            callbacks=[message_callback],
            prefetch_count=1  # disable prefetching
        ):
            while True:  # start consuming
                connection.drain_events()
                # `drain_events` blocks until there's a message in the queue,
                # and then consumes 1 message only.

Обновленный Email Cruncher в Python с использованием Комбу

Теперь, когда мы знаем, как использовать RabbitMQ с помощью Kombu, мы можем спроектировать немного более сложный обработчик электронной почты с дополнительными микросервисами.

Вот блок-схема:

Вот наши новые микросервисы:

  • Подсказка. Это как и раньше, за исключением того, что обработчикам передается имя файла, а не само содержимое. Имя файла отправляется в виде сообщения на обмен электронной почтой, разветвленный обмен
  • Быстрые и медленные обработчики электронной почты. Теперь есть 2 кранчера электронной почты, один «быстрый», использующий регулярные выражения, и один «медленный», использующий немного более продвинутый алгоритм. Они находят электронные письма в исходном файле и каждый передает список вновь найденных писем в очередь статуса (через обмен по умолчанию).
  • средство отображения статуса. Это средство отображения статуса отслеживает все электронные письма, которые мы обнаружили, относящиеся к определенному домену, например spam_domain. Он поддерживает глобальное хранилище данных в памяти всех ранее найденных писем. Если он обнаруживает какие-либо новые, для каждого нового он отправляет новое сообщение в обмен спамерами, прямой обмен. Содержание сообщения - «спам мне!» и его ключ маршрутизации должен быть равен локальной части адреса электронной почты, то есть части до '@'.
  • спамер. Спамер должен быть запущен, указав ключ привязки и имя очереди, из которой он должен потреблять. Например, если он запущен с ключом привязки «emanuel», он начинает потреблять из очереди «emanuel» с ключом привязки «emanuel». Эта очередь прикреплена к обмену «спамерами». Если спамер получает сообщение из очереди, он отправляет письмо с предупреждением на адрес «emanuel @ spam_domain».

По общему признанию, этот пример немного надуманный. Например, микросервис рассылки спамеров можно было бы реализовать гораздо проще, если в качестве входных данных использовать адрес электронной почты и рассылать его спамом. Однако этот пример иллюстрирует публикацию / подписку в двух разных случаях:

  1. Новые «обработчики электронной почты» могут быть добавлены путем присоединения новой очереди к обмену электронной почтой и использования из нее. Например, если мы найдем еще один альтернативный алгоритм для извлечения писем, мы можем довольно легко добавить его как новый «модуль».
  2. Если какой-либо пользователь с spam_domain адресом электронной почты не хочет получать электронные письма с предупреждениями, мы можем просто «отписаться» от него, остановив соответствующий экземпляр спамера.

Я реализовал версию этого на Python, используя библиотеку Python Kombu здесь: https://github.com/egeromin/async-experiments/tree/master/python/kombu_version. Взгляните!

(Болезненное признание: «медленный» обработчик на данный момент идентичен обработчику «регулярных выражений», за исключением того, что он спит 5 секунд для каждого сообщения. Однажды я предложу подходящую альтернативу!)

Вместо этого используйте сельдерей

Беглый взгляд на версию Kombu этого расширенного инструментария электронной почты показывает, что существует изрядное количество шаблонного кода. Каждый раз нам приходится подключаться вручную, настраивать потребителя и производителя, объявлять очереди и т. Д. Вот тут и появляется Celery: он выполняет большую часть тяжелой работы и позволяет напрямую определять задачи внутри ваш код, который должен выполняться асинхронно. Я привел пример в начале статьи.

Это означает, что вместо отправки сообщений на биржи в сельдерее вы вызываете задачи асинхронно. Под капотом сельдерей по-прежнему отправляет «текстовые» сообщения через AMQP. Он сериализует словарь в формате JSON, содержащий информацию, которая позволяет ему «находить» нужную задачу на другом конце. Одно большое различие между обычным AMQP и сельдереем заключается в том, что публикация / подписка невозможны с использованием сельдерея. Это потому, что вы вызываете отдельные задачи с помощью сельдерея, а не публикуете на биржах. Celery - это не замена или упрощение обычного AMQP - это другой способ асинхронного программирования.

Вот упрощенная архитектура измельчителя сельдерея:

Я реализовал версию этого в https://github.com/egeromin/async-experiments/tree/master/python/celery_version. Взгляните на него - в частности, на README. Я подожду

Несколько замечаний:

  • Мы можем создать конвейер задач, используя функцию celery chain. Это часть холста сельдерея, набора функций для написания сложных рабочих процессов. Помимо chain, который объединяет задачи, есть также group, который выполняет несколько задач параллельно, и chord, который запускает задачи параллельно, ожидает их завершения и передает список всех результатов одной функции обратного вызова.
  • Шаблон chain не работает, когда мы вызываем спамера для каждого нового письма, которое может быть спамировано - нет функции Canvas, которая принимает в качестве аргумента список и применяет функцию асинхронно к каждому элементу в списке. (map и xmap разные - они вызывают одну асинхронную задачу, которая последовательно применяет функцию к каждому элементу в списке, а затем возвращает результат).
  • По умолчанию вызов воркера с использованием celery -A cruncher worker потребляет из всех очередей, которые мы объявили. Если, как в версии комбу, мы хотим запускать отдельные воркеры для каждой микросервиса, мы должны ограничить очереди, из которых воркер потребляет. Например, чтобы запустить cruncher, мы должны запустить celery -A cruncher worker -Q crunch
  • Микрослужба состояния использует глобальную переменную. Мы должны быть осторожны, потому что вызов celery worker на самом деле запускает 4 подпроцесса, каждый из которых использует очереди - так что фактически 4 рабочих процесса одновременно. Мы хотим убедиться, что это только 1 рабочий и 1 глобальная переменная в памяти «база данных». Мы можем сделать это, запустив статус-воркер с параллелизмом 1: celery -A cruncher worker -Q status -c 1
  • В версии kombu спамер инициализируется локальным именем, которое предполагается рассылать. Здесь, поскольку я не могу передавать аргументы работнику сельдерея, вместо этого я использую переменные среды - отсюда os.getenv("SPAMMER_QUEUE", 'spammer') в cruncher.py.

Тем не менее, несмотря на то, что я не смог реализовать полную версию моего очень надуманного примера электронной почты в Celery, он остается чрезвычайно полезным. Как уже было показано, асинхронный вызов задач очень прост. Создавать сложные конвейеры задач с помощью Canvas очень просто. И наконец, по умолчанию доступен RPC:

@app.task
def add_one(x):
    return x + 1

res = add_one.delay(1)  # returns a 'promise'
result = res.get()  # blocks until the RPC returns
print(result)  # prints '2'

Действительно полезный инструмент для мониторинга сельдерея - цветок. Его очень легко настроить, и он имеет приятный интерфейс, позволяющий увидеть, какие задачи выполнены успешно, а какие нет и т. Д.

Что я не рассказывал

В асинхронном программировании на Python и AMQP есть еще много чего.

  • Asyncio - альтернатива брокерам сообщений для асинхронного программирования на Python. Он реализует цикл событий.
  • Приоритетные очереди. AMQP позволяет вам определять приоритетные очереди с приоритетом 1–10. Сообщения с более высоким приоритетом сначала отправляются работникам.
# priority using celery
queue = Queue('my-queue', queue_arguments={'x-max-priority': 10})  # define a priority queue
task.apply_async({kwarg: "some kwarg"}, priority=4)  # invoke an async task with priority
  • Другие брокеры сообщений, такие как Redis.
  • Больше деталей AMQP, таких как сердцебиение и т. Д.

Вот и все - спасибо, что прочитали!

Первоначально опубликовано на gist.github.com.