Как правильно прервать context.socket.recv() в ZeroMQ?

У меня есть небольшое программное обеспечение, в котором есть отдельный поток, ожидающий сообщений ZeroMQ. Я использую протокол связи PUB/SUB ZeroMQ.

В настоящее время я прерываю этот поток, устанавливая переменную "cont_loop" в False.

Но я обнаружил, что, когда подписчику ZeroMQ не приходят сообщения, я не могу выйти из потока (без отключения всей программы).

def __init__(self):
    Thread.__init__(self)
    self.cont_loop = True

def abort(self):
    self.continue_loop = False

def run(self):
    zmq_context = zmq.Context()
    zmq_socket = zmq_context.socket(zmq.SUB)
    zmq_socket.bind("tcp://*:%s" % *(5556))
    zmq_socket.setsockopt(zmq.SUBSCRIBE, "")
    while self.cont_loop:
        data = zmq_socket.recv()
        print "Message: " + data
    zmq_socket.close()
    zmq_context.term()
    print "exit"

Я попытался переместить socket.close() и context.term() в метод прерывания. Так что он отключает абонента, но это убило всю программу.

Каков правильный способ закрыть вышеуказанную программу?


person Easyrider    schedule 04.06.2015    source источник


Ответы (1)


Q: Каков правильный способ...?

О: Есть много способов достичь поставленной цели. Позвольте мне выбрать только один в качестве макета того, как обрабатывать распределенный обмен сообщениями между процессами.

Первый. Предположим, что в типичной задаче проектирования программного обеспечения есть больше приоритетов. Некоторые более высокие, некоторые более низкие, некоторые даже настолько низкие, что можно отложить выполнение этих низкоприоритетных подзадач, чтобы в планировщике оставалось больше времени для выполнения тех подзадач, которые не выдерживают ожидания.

Это сказало, давайте просмотрим ваш код. SUB-сторонняя инструкция для .recv() в том виде, в каком она использовалась, вызывает две вещи. Один видимый — он выполняет операцию RECEIVE на сокете ZeroMQ с поведением SUB. Второй, менее заметный, он остается висящим, пока не получит что-то «совместимое» с текущим состоянием SUB-поведения (подробнее об этом позже).

Это означает, что он также БЛОКИРУЕТ все время, так как такой .recv() вызов метода ПОКА какое-то неизвестное, локально неконтролируемое совпадение состояний/событий заставляет его доставить ZeroMQ-сообщение, с его содержание совместимо с локально предустановленным состоянием этого (по-прежнему блокирующего) экземпляра SUB-поведения.

Это может занять целую вечность.

Именно поэтому .recv() скорее используется внутри цикла управления, где внешняя обработка получает как возможность, так и ответственность делать то, что вы хотите (включая операции, связанные с прерыванием, и справедливое/мягкое завершение с надлежащим освобождением ресурсов). ).

Процесс получения становится .recv( flags = zmq.NOBLOCK ) в try: except: эпизоде. Таким образом, ваш локальный процесс не теряет контроль над потоком событий (включая NOP, являющийся одним из таких).

Лучший следующий шаг?

Не торопитесь и прочтите великолепную книгу драгоценных камней «Code Connected, Volume 1», которую опубликовал Питер ХИНТДЖЕНС, соучредитель ZeroMQ (также как PDF ).

Многие его мысли и ошибки, которых следует избегать, которыми он поделился с нами, действительно стоят вашего времени.

Наслаждайтесь возможностями ZeroMQ. Это очень мощный инструмент, и его стоит освоить сверху вниз.

person user3666197    schedule 06.06.2015
comment
Большое спасибо за ответ! Очень информативно. - person Easyrider; 18.06.2015