Получение сообщений из темы в концентраторе сообщений

Я пытаюсь получить сообщения из темы в концентраторе сообщений на bluemix, используя Confluent Kafka Python. Мой код находится ниже, но что-то не работает. Тема и концентратор сообщений запущены и работают, так что, вероятно, что-то с кодом.

from confluent_kafka import Producer, KafkaError, Consumer

consumer_settings = {
'bootstrap.servers': 'broker-url-here',
'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'ssl',
'sasl.username': 'username-here',
'sasl.password': 'password-here',
}

c = Consumer(**consumer_settings)
c.subscribe(['topic-here'])

running = True

while running:
    msg = c.poll()
    if msg.error():
        print("Error while retrieving message")
        c.close()
        sys.exit(10)
    elif (msg is not None):
        for x in msg:
            print(x)
    else:
        sys.exit(10)

Когда я запускаю код, кажется, что он застревает на msg = c.poll(). Поэтому я предполагаю, что либо не удается подключиться, либо не удается получить сообщения. Сами реквизиты правильные.


person jawwe    schedule 16.03.2018    source источник


Ответы (1)


Логика потребления выглядит нормально, но конфигурация для потребителя неверна.

  • security.protocol необходимо установить на sasl_ssl
  • ssl.ca.location needs to point to a PEM file containing trusted certificates. The location of that file varies for each OS, but for the most common it's:
    • Bluemix/Ubuntu: /etc/ssl/certs
    • Красная Шляпа: /etc/pki/tls/cert.pem
    • macOS: /etc/ssl/certs.pem

У нас также есть пример приложения, использующего этот клиент, который можно легко запустить или развернуть в Bluemix: https://github.com/ibm-messaging/message-hub-samples/tree/master/kafka-python-console-sample

person Mickael Maison    schedule 19.03.2018