Тайм-ауты теневого обновления AWS IoT Python Device SDK

Я уже некоторое время занимаюсь разработкой теневых обновлений AWS IoT для наших устройств. Мы используем SDK для устройств AWS IoT Python, расположенный по адресу https://github.com/aws/aws-iot-device-sdk-python в ARM-дистрибутиве Linux (созданном с помощью Yocto / Bitbake) и Python 2.7.3 (это единственная доступная версия, предоставляемая нашим поставщиком оборудования для Yocto, которую мы использовать для нашей платы ARM).

Я загрузил образец кода теневого обновления из репозитория AWS, изменил его в соответствии с нашими потребностями и поместил в нашу сборку устройства. Я бы сказал, что в большинстве случаев он работает, но имеет частоту отказов, с которой мне все еще некомфортно. Обратите внимание: в этом коде я добавил параметр «OfflinePublishQueueing», который будет обрабатывать, когда устройству требуется время для подключения. Это было рекомендовано одним сайтом для предотвращения ошибок PublishQueueDisabled, которые мы получали. Вот код инициализации. Добавлю, что знаю, что сертификаты, которые мы используем, хороши, иначе успеха не было бы.

self.AWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient("InCoIoTShadowUpdate")
self.AWSIoTMQTTShadowClient.configureEndpoint(endpoint, 8883)
self.AWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyFile, 
jointCertificateFile)
# AWSIoTMQTTShadowClient configuration
self.AWSIoTMQTTShadowClient.configureAutoReconnectBackoffTime(1, 32, 20)
self.AWSIoTMQTTShadowClient.configureConnectDisconnectTimeout(10)  # 10 sec
self.AWSIoTMQTTShadowClient.configureMQTTOperationTimeout(15)  # 15 sec
MQTTClient = self.AWSIoTMQTTShadowClient.getMQTTConnection()
MQTTClient.configureOfflinePublishQueueing(5, DROP_OLDEST)

А позже код для обновления тени с учетом того, что ThingId установлен в другом месте из конфигурации локального устройства.

def ConnectAndUpdate(self):

    deviceState = InovaIoTDeviceState()
    log = logging.getLogger("InovaIoTDeviceClient:connectAndUpdate")

    # Connect and subscribe to AWS IoT
    try:
        self.AWSIoTMQTTShadowClient.connect()
    except (connectError, connectTimeoutException):
        log.error("Error connecting to AWS IoT service")
        return False

    # Create a deviceShadow with persistent subscription
    updateBot = self.AWSIoTMQTTShadowClient.createShadowHandlerWithName(self.ThingId, True)
    JSONPayload = deviceState.GetDeviceShadowDocument()

    try:
        updateBot.shadowUpdate(JSONPayload, self.ShadowUpdateCallback, 15)
    # This is bad if these errors are thrown, probably either an initial device registration failure
    # or global problem with Inova AWS IoT console configuration or lambda function
    except (publishError, subscribeError):

        log.error('Publish or subscribe error..')
        return False

    except (publishTimeoutException, subscribeTimeoutException):

        # It is possible we are here due to a temporary snafu in AWS
        log.error('Publish or subscribe timeout..')
        return False

    except publishQueueDisabledException:

        # From time to time, AWS will randomly disconnect.
        log.error('Publish Queue disabled..')
        return False

    while not self.ResponseReceived:
        time.sleep(1)

    # reset state
    self.ResponseReceived = False

    try:
        self.AWSIoTMQTTShadowClient.disconnect()
    except (disconnectError, disconnectTimeoutException):
        log.error('Error attempting to disconnect')

    return self.UpdateSuccess

и, наконец, код обратного вызова

# Custom MQTT message callback
def ShadowUpdateCallback(self, payload, responseStatus, token):
    log = logging.getLogger("InovaIoTDeviceClient:shadowUpdateCallback")
    if responseStatus == "timeout":
        log.info("Shadow update timeout")
        self.UpdateSuccess = False
    elif responseStatus == "accepted":
        log.info("Shadow update successful")
        self.successive_errors = 0
        self.UpdateSuccess = True
    elif responseStatus == "rejected":
        log.info("Rejected shadow update")
        self.UpdateSuccess = False

    self.ResponseReceived = True

Вот различные ошибки, которые мы получаем, которые встречаются довольно часто. Я бы сказал, что вероятность успеха теневого обновления составляет всего около 60%. AWS сообщает о тайм-ауте:

2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.shadow.deviceShadow) 
Subscribed to update accepted/rejected topics for deviceShadow: 
qqpba4fgsfazl2zfgqq8zkavj (Line:372)
2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Offline 
publish request detected. (Line:343)
2017-09-06 11:15:13: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Try 
queueing up this request... (Line:347)
2017-09-06 11:15:28: (INFO:AWSIoTPythonSDK.core.shadow.deviceShadow) Shadow 
request with token: InCoIoTShadowUpdate_qqpba4fgsfazl2zfgqq8zkavj_0_acbxa 
has timed out. (Line:202)
2017-09-06 11:15:28: (INFO:InCoIoTDeviceClient:shadowUpdateCallback) Shadow 
update timeout (Line:188)

Вот ошибка тайм-аута из фактического вызова обновления тени

2017-09-06 12:40:11: (INFO:AWSIoTPythonSDK.core.protocol.mqttCore) Connected 
to AWS IoT. (Line:302)
2017-09-06 12:40:27: (ERROR:AWSIoTPythonSDK.core.protocol.mqttCore) No 
feedback detected for subscribe request 1. Timeout and failed. (Line:413)
2017-09-06 12:40:27: (ERROR:InCoIoTDeviceClient:connectAndUpdate) Publish 
or subscribe timeout.. (Line:147)
2017-09-06 12:40:27: (INFO:cycle) Unsuccessful shadow update... (Line:173)

Итак, это таймауты, которые, по сути, являются всеми ошибками. В качестве дополнительной проблемы мы усилили подключение нашего устройства и попытки публикации не для просмотра содержимого (поскольку оно зашифровано по протоколу TLS 1.2), а для просмотра поведения подключения. Мы отметили, что поведение, которое мы видим из выходных данных wirehark, заключается в том, что конечная точка AWS обслуживается как минимум 8 различными IP-адресами. В случаях тайм-аута мы всегда видим, что соединение происходит где-то. В случае, если мы подписываемся и публикуем в теме теневого обновления, но получаем тайм-аут, он подключается только к одному из адресов. В течение тайм-аута подписки или публикации он пробует три разных адреса.

Может быть, все это на AWS и производительности на их стороне, но мне было интересно, видел ли кто-нибудь это и мог ли это обойти. Увеличение значений тайм-аута, похоже, не помогает.


person Kevin Hirst    schedule 07.09.2017    source источник
comment
Если кто-то еще добился успеха с более свежими версиями Python, это тоже может быть актуально. Возможно, нам нужно как-то обновить до 2.7.11, но здесь это довольно сложная задача. Или, возможно, выяснить, как обновить библиотеку запросов.   -  person Kevin Hirst    schedule 07.09.2017
comment
Запросы Python обновлены до версии 2.18.4 с urllib3. Никаких изменений в поведении.   -  person Kevin Hirst    schedule 12.09.2017


Ответы (1)


Итак, мы с этим разобрались. A.) Мы обновились до недавно выпущенного 1.2.0 AWS IoT Python SDK, и B.) Похоже, они исправили ошибку, которая позволила мне использовать онлайн-обратный вызов при получении MQTT CONNACK. Поэтому я изменил свой код на этот:

    try:
        self.AWSIoTMQTTShadowClient.onOnline = self.onConnect
        self.AWSIoTMQTTShadowClient.connect()
    except (connectError, connectTimeoutException):
        log.error("Error connecting to AWS IoT service")
        return False

    while not self.is_aws_connected:
        self.seconds_waited_for_conn_aws += 1
        if self.seconds_waited_for_conn_aws > 30:
            log.error("Timeout waiting for connected status.")
            return False
        time.sleep(1)

    # Create a deviceShadow with persistent subscription
    updateBot = self.AWSIoTMQTTShadowClient.createShadowHandlerWithName(self.ThingId, True)

    try:
        updateBot.shadowUpdate(JSONPayload, self.ShadowUpdateCallback, 15)
    # This is bad if these errors are thrown, probably either an initial device registration failure
    # or global problem with Inova AWS IoT console configuration or lambda function
    except (publishError, subscribeError):

        log.error('Publish or subscribe error..')
        return False

    except (publishTimeoutException, subscribeTimeoutException):

        # It is possible we are here due to a temporary snafu in AWS
        log.error('Publish or subscribe timeout..')
        return False

    except publishQueueDisabledException:

        # From time to time, AWS will randomly disconnect.
        log.error('Publish Queue disabled..')
        return False

    while not self.ResponseReceived:
        time.sleep(1)

    # reset state
    self.ResponseReceived = False
    return self.UpdateSuccess

def onConnect(self):
    log = logging.getLogger("InovaIoTDeviceClient:onConnect")
    log.info("Callback from AWS layer on connect.")
    self.is_aws_connected = True

def runOnce(self):
    try:
        log = logging.getLogger("cycle")
        if not self.ConnectAndUpdate():
            log.info("Unsuccessful shadow update...")
    finally:
        # only here to make sure we disconnect
        try:
            self.AWSIoTMQTTShadowClient.disconnect()
            self.is_aws_connected = False
        except (disconnectError, disconnectTimeoutException):
            log.error('additional error attempting to disconnect')

Как видите, я устанавливаю обратный вызов «onOnline» в первой строке кода и жду завершения этого статуса. Как и раньше, я часто ждал, пока система будет помечена как СТАБИЛЬНАЯ в основе AWS SDK (из worker.py в AWS SDK).

def _dispatch_connack(self, mid, rc):
    status = self._client_status.get_status()
    self._logger.debug("Dispatching [connack] event")
    if self._need_recover():
        if ClientStatus.STABLE != status:  # To avoid multiple connack dispatching
            self._logger.debug("Has recovery job")
            clean_up_debt = Thread(target=self._clean_up_debt)
            clean_up_debt.start()
    else:
        self._logger.debug("No need for recovery")
        self._client_status.set_status(ClientStatus.STABLE)

По-видимому, вы не можете рассчитывать на то, что система MQTT будет полностью подключена и готова к работе с другой стороны вызова connect (), но вы можете, если дождетесь события onOnline. Вот почему раньше нам приходилось вызывать configureOfflinePublishQueueing (). И почему мы увидим это в логах ...

2017-09-19 14:45:13: (INFO:AWSIoTPythonSDK.core.protocol.mqtt_core) Offline request detected! (Line:313)

К счастью, теперь это работает намного лучше. Между прочим, старый код был НАМНОГО более успешным (98% успеха на настольных виртуальных машинах Linux, 45% успеха на небольшой плате ARM) на более мощных процессорах, поэтому я думаю, что их система "офлайн-публикации" испытывала условия гонки на нашем маленьком ARM плата процессора.

person Kevin Hirst    schedule 19.09.2017