Логика Python ZeroMQ PUSH/PULL, установите верхнюю отметку на нижний пуллер без потери какого-либо сообщения

Я использую простые коды python PUSH/PULL worker/server для отправки и получения сообщений.

Рабочий использует сокет PUSH для отправки сообщений на сервер PULL. Процессорный блок сервера не такой сильный, как рабочий, поэтому при отправке слишком большого количества сообщений оперативная память сервера начинает расти, пока система не убьет все.

Я попытался установить максимальную отметку приемника следующим образом:

worker_sock_in = ZMQ_CTXT.socket(zmq.PULL)
worker_sock_in.setsockopt(zmq.LINGER, 1000))
worker_sock_in.setsockopt(zmq.RCVTIMEO, 1000)) # detects if the link is broken
worker_sock_in.setsockopt(zmq.RCVHWM, 1000)
worker_sock_in_port = worker_sock_in.bind_to_random_port(listen_addr, port_start, port_end)

Приведенный ниже код используется рабочим для создания и отправки сообщений:

sock_dest = ZMQ_CTXT.socket(zmq.PUSH)
sock_dest.setsockopt(zmq.LINGER, 1000))
sock_dest.setsockopt(zmq.SNDTIMEO, 1000)) # detects if the link is broken
sock_dest.setsockopt(zmq.SNDHWM, 0) # never block on sending msg
sock_dest.connect(sock_dest_address)
# sends a msg
self.sock_dest.send(msg, zmq.NOBLOCK)

И это, кажется, решает проблему, но я предполагаю, что сообщения о переполнении просто отбрасываются сервером, что неприемлемо в моей ситуации.

Я основал свою разработку, используя этот поток, но я не уверен, что понимаю часть ответа Дополнительная информация.

Итак, вопросы заключаются в том, каково реальное поведение HWM, достигаемое на сокетах noblock push/pull zeromq, и есть ли способ иметь инфраструктуру push-pull, которая гарантирует, что все отправленные сообщения будут получены сокетом pull без увеличения его памяти или блокировки отправителя. ?


person aze    schedule 23.12.2019    source источник
comment
Вот аналогичный вопрос и вот ответ, но это не гарантирует, что все отправленные сообщения (таким образом, пропустить отправку). Таким образом, в вашем случае может быть лучшим выбором брокер сообщений, чем ZMQ без брокера.   -  person Benyamin Jafari    schedule 23.12.2019


Ответы (2)


Вопрос : есть ли способ создать инфраструктуру push-pull, гарантирующую получение всех отправленных сообщений через pull-сокет >без увеличения памяти или блокировки отправителя?

Прочь? Да, есть:

Встроенная ZeroWarranty (охватывающая сообщение, доставляемое либо как битовая копия оригинала 1:1, либо не доставляемое вообще) должна быть расширена — либо с помощью протокола уровня приложения (охватывающего повторную отправку для те, которые не были доставлены до тех пор, пока они не будут подтверждены) или перевод вашей инфраструктуры на использование определенного протокола гарантированной доставки, который поможет с этим сверхстандартным требованием - используйте расширение транспортного класса norm:// и переместите парадигму, в случае, когда PUSH/PULL все еще не находится в RTO-состоянии, в PUB/SUB, XPUB/XSUB архетип масштабируемого шаблона формальной коммуникации.

В libzmq доступен новый вариант транспорта. Файлы "norm_engine.hpp" и "norm_engine.cpp" предоставляют реализацию параметра транспортного протокола NACK-Oriented Reliable Multicast (NORM) для ZeroMQ. NORM — это протокол открытых стандартов IETF, указанный в RFC 5740 и сопутствующих документах. Военно-морская исследовательская лаборатория (NRL) предоставляет эталонную реализацию с открытым исходным кодом, которая размещена по адресу http://www.nrl.navy.mil/itd/ncs/products/norm.

NORM поддерживает надежную доставку данных по IP-многоадресной рассылке, а также поддерживает одноадресную передачу данных (точка-точка). . NORM работает поверх протокола пользовательских дейтаграмм (UDP) и поддерживает надежность с помощью автоматического повторного запроса (ARQ) на основе NACK, который использует кодирование стирания пакетов для очень эффективной групповой связи. NORM также обеспечивает автоматическое управление перегрузкой в ​​соответствии с протоколом TCP и механизмы для поддержки сквозного управления потоком. Реализация NRL NORM также может быть настроена для предоставления базовой транспортной услуги с наилучшими усилиями, подобной UDP (без обратной связи с получателем), и это можно улучшить, добавив к передаче некоторое количество устанавливаемых приложением пакетов упреждающей прямой коррекции ошибок (FEC). Т. е. по умолчанию NORM отправляет только «реактивные» пакеты восстановления FEC в ответ на NACK, но также может быть настроен на упреждающую отправку добавленных пакетов восстановления для уровня надежности без какой-либо обратной связи от получателей. В дополнение к его удобный для TCP контроль перегрузки, NORM также может быть настроен для работы с фиксированной скоростью, а реализация NRL поддерживает некоторые дополнительные параметры автоматического управления перегрузкой, подходящие для использования в средах беспроводной связи, подверженных битовым ошибкам. Хотя его надежная операция ARQ в основном основана на NACK (отрицательное подтверждение при обнаружении потери пакета), он также поддерживает необязательное положительное подтверждение (ACK) от получателей, которое можно использовать для подтверждения доставки и явного управления потоком.

Требование Раздувание памяти имеет два пути продвижения вперед: первый — явный контроль для .send()-er, чтобы не перегружать ресурсы Context()-экземпляра (RAM) на стороне .send()er, т.е. в пределах ограничений ограниченных ресурсов (в основном предотвращающих любые лавинные/отброшенные сообщения), другой - иметь достаточно оперативной памяти и правильно настроенный Context()-экземпляр, чтобы пропускать все данные.


Вопрос : Каково реальное поведение HWM, достигнутого на сокетах noblock push/pull zeromq?

Во-первых, давайте демистифицируем это. Директива ZMQ_NOBLOCK указывает локальной, .send()-стороне Context() немедленно вернуть вызов .send()-метода обратно вызывающей стороне, т.е. не блокируя выполнение вызывающего кода (сообщения-полезные нагрузки помещаются для дальнейшей обработки внутри локального ZeroMQ Context()- экземпляр, независимо от его внутреннего состояния — классический неблокирующий код-дизайн).

ZMQ_SNDHWM, напротив, инструктирует Context()-экземпляр, как должны быть установлены пороги этого сокета и для указанного случая PUSH/PULL .send()-er:

высокая отметка — это жесткое ограничение на максимальное количество ожидающих сообщений, которые ØMQ должен поставить в очередь в памяти для любого однорангового узла, с которым взаимодействует указанный сокет. Нулевое значение означает отсутствие ограничений.

Если этот предел достигнут, сокет переходит в исключительное состояние, и в зависимости от типа сокета ØMQ предпринимает соответствующие действия, такие как блокировка или удаление отправленных сообщений. Обратитесь к описаниям отдельных сокетов в zmq_socket(3) для получения подробной информации о точном действии, предпринятом для каждого типа сокета.

ØMQ не гарантирует, что сокет примет столько сообщений, сколько ZMQ_SNDHWM, а фактическое ограничение может быть на 60-70% ниже в зависимости от потока сообщений на сокете.

Использование также ZMQ_XPUB_NODROP может помочь в случаях использования norm://-транспортного класса.

Также имейте в виду, что по умолчанию API ZMQ_PUSH-сокетов подтверждает, что:

Когда сокет ZMQ_PUSH переходит в состояние отключения звука из-за достижения верхней отметки для всех нижестоящих узлов или если нижестоящих узлов нет вообще, тогда любые операции zmq_send(3) на сокете должны блокироваться. пока состояние отключения звука не закончится или хотя бы один нижестоящий узел не станет доступным для отправки; сообщения не удаляются.


Для подозреваемых в недостаточной производительности (сторона PULL) также проверьте настройки правильного размера на стороне O/S, используя .getsockopt( ZMQ_RCVBUF )-метод и адаптируя размер с правильным, достаточно большим .setsockopt( ZMQ_RCVBUF ), по мере необходимости :

Параметр ZMQ_RCVBUF должен установить размер приемного буфера базового ядра для сокета на указанный размер в байтах. Значение -1 означает, что ОС по умолчанию остается без изменений. Дополнительные сведения см. в документации по операционной системе для параметра сокета SO_RCVBUF.


Если ничего из вышеперечисленного не помогло, можно внедрить метаплан самодиагностики в инфраструктуру ZeroMQ, используя сервисы zmq_socket_monitor, и получить полный контроль над ситуациями, которые обычно происходят вне поля зрения кода приложения. (отражающие внутренние состояния API и переходы по мере необходимости).

Решение за вами.

person user3666197    schedule 24.12.2019
comment
В нем говорится, что messages are not discarded когда ZMQ_PUSH переходит в состояние отключения звука, это тот случай, когда он используется с параметром NOBLOCK? Я рассмотрю NORM, который звучит очень многообещающе. - person aze; 24.12.2019
comment
Директива NOBLOCK связана с тем, как вызов будет возвращаться на сторону вызывающего абонента .recv(), а не с внутренним состоянием конечной точки Socket (лучше всего перечитать документацию API, об обработке косвенного < i>errno-переменная для отражения внутреннего FSA-состояния) - person user3666197; 29.12.2019

Я предлагаю вам добавить брокера посередине (между отправителем и получателем), который будет сохранять отправленные сообщения в течение заданного времени. вы будете обязаны сделать логику кода, которая будет сохранять сообщения и получать уведомления, когда сервер не получит определенное сообщение. 0mq не предоставляет способа сохранить или восстановить потерянное сообщение.

person the devops guy    schedule 23.12.2019