Завершить потребитель консоли Kafka, когда все сообщения будут прочитаны

Я знаю, что должен быть способ сделать это, но я не могу понять это. Мне нужно остановить потребителя kafka, как только я прочитаю все сообщения из очереди.

Может ли кто-нибудь предоставить любую информацию об этом?


person divinedragon    schedule 18.12.2014    source источник


Ответы (4)


Вы можете передать параметр: -consumer-timeout-ms со значением при запуске потребителя, и он выдаст исключение, если за это время не было прочитано ни одного сообщения. Например, чтобы остановить потребителя, если за последние 2 секунды не поступило новых сообщений: kafka.consumer.ConsoleConsumer -consumer-timeout-ms 2000

Вы можете увидеть это и все остальные введите параметры здесь

person Lundahl    schedule 18.12.2014
comment
Можете ли вы поделиться полной командой? ./kafka-console-consumer.bat принимает параметр -property. Этот, похоже, не работает. - person Ujjwal Wadhawan; 05.03.2016
comment
Большое спасибо @Lundahl - person Khan; 11.07.2016
comment
Полная команда будет выглядеть примерно так: ./kafka-console-consumer.sh --bootstrap-server my-host:9092 --new-consumer --topic test-topic --from-beginning --timeout-ms 2000 Я пробовал, и она работает нормально, хотя при выходе выводит сообщение об ошибке и трассировку стека. - person bischoje; 03.11.2017

В настоящее время в Kafka версии 2.11-2.1.1 есть скрипт с именем kafka-console-consumer.sh.

У него новый флаг: --timeout-ms.

По сути, этот флаг представляет собой максимальное время ожидания перед выходом, когда нет нового журнала для ожидания. Это в миллисекундах.

Вы можете использовать это свойство, чтобы завершить работу с пользователем консоли после прочтения всех сообщений.

person Planck35    schedule 30.09.2019

Вы можете использовать SimpleConsumerShell с опцией no-wait-at-logend. См. раздел SystemTools-SimpleConsumerShell.

Например:

./kafka-run-class.bat kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic kafkademo --partition 0 --no-wait-at-logend
person Ujjwal Wadhawan    schedule 07.03.2016

Если вы не уверены в использовании клиента Scala, попробуйте kafkacat с параметром -e сообщая ему выйти, когда будет достигнут конец раздела.

Например. чтобы использовать все сообщения из раздела mytopic 2, а затем выйти:

$ kafkacat -b mybroker -t mytopic -p 2 -o beginning -e

Или используйте последние 3000 сообщений, а затем выйдите:

$ kafkacat -b mybroker -t mytopic -p 2 -o -3000 -e
person Edenhill    schedule 18.12.2014