Я знаю, что должен быть способ сделать это, но я не могу понять это. Мне нужно остановить потребителя kafka, как только я прочитаю все сообщения из очереди.
Может ли кто-нибудь предоставить любую информацию об этом?
Я знаю, что должен быть способ сделать это, но я не могу понять это. Мне нужно остановить потребителя kafka, как только я прочитаю все сообщения из очереди.
Может ли кто-нибудь предоставить любую информацию об этом?
Вы можете передать параметр: -consumer-timeout-ms со значением при запуске потребителя, и он выдаст исключение, если за это время не было прочитано ни одного сообщения. Например, чтобы остановить потребителя, если за последние 2 секунды не поступило новых сообщений: kafka.consumer.ConsoleConsumer -consumer-timeout-ms 2000
Вы можете увидеть это и все остальные введите параметры здесь
./kafka-console-consumer.sh --bootstrap-server my-host:9092 --new-consumer --topic test-topic --from-beginning --timeout-ms 2000
Я пробовал, и она работает нормально, хотя при выходе выводит сообщение об ошибке и трассировку стека.
- person bischoje; 03.11.2017
В настоящее время в Kafka версии 2.11-2.1.1 есть скрипт с именем kafka-console-consumer.sh
.
У него новый флаг: --timeout-ms
.
По сути, этот флаг представляет собой максимальное время ожидания перед выходом, когда нет нового журнала для ожидания. Это в миллисекундах.
Вы можете использовать это свойство, чтобы завершить работу с пользователем консоли после прочтения всех сообщений.
Вы можете использовать SimpleConsumerShell с опцией no-wait-at-logend. См. раздел SystemTools-SimpleConsumerShell.
Например:
./kafka-run-class.bat kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic kafkademo --partition 0 --no-wait-at-logend
Если вы не уверены в использовании клиента Scala, попробуйте kafkacat с параметром -e
сообщая ему выйти, когда будет достигнут конец раздела.
Например. чтобы использовать все сообщения из раздела mytopic 2, а затем выйти:
$ kafkacat -b mybroker -t mytopic -p 2 -o beginning -e
Или используйте последние 3000 сообщений, а затем выйдите:
$ kafkacat -b mybroker -t mytopic -p 2 -o -3000 -e