Структурированное потоковое хранилище Kafka Source Offset Storage

Я использую источник структурированной потоковой передачи для Kafka (руководство по интеграции ), который, как указано, не фиксирует никакого смещения.

Одна из моих целей - следить за ним (проверить, не отстает ли он и т. д.). Несмотря на то, что он не фиксирует смещения, он обрабатывает их, время от времени запрашивая kafka и проверяя, какое из них будет обрабатываться следующим. Согласно документации смещения записываются в HDFS, поэтому в случае сбоя его можно восстановить, но вопрос:

Где они хранятся? Есть ли способ отслеживать потребление kafka (извне программы, поэтому kafka cli или аналогичный, смещение, приходящее с каждой записью, не подходит для варианта использования) искрового потока (структурированного), если он не фиксирует смещения ?

Ваше здоровье


person ppanero    schedule 27.04.2017    source источник


Ответы (3)


Структурированная потоковая передача для kafka сохраняет смещения в HDFS ниже структур.

Пример настройки checkpointLocation приведен ниже.

.writeStream.
.....
  option("checkpointLocation", "/tmp/checkPoint")
.....

В этом случае Structured Streaming для kafka сохраняет путь ниже

/tmp/checkPoint/offsets/$'batchid'

Сохраненный файл имеет следующий формат.

v1
{"batchWatermarkMs":0,"batchTimestampMs":$'timestamp',"conf":{"spark.sql.shuffle.partitions":"200"}}
{"Topic1WithPartiton1":{"0":$'OffsetforTopic1ForPartition0'},"Topic2WithPartiton2":{"1":$'OffsetforTopic2ForPartition1',"0":$'OffsetforTopic2ForPartition1'}}

Например.

v1
{"batchWatermarkMs":0,"batchTimestampMs":1505718000115,"conf":{"spark.sql.shuffle.partitions":"200"}}
{"Topic1WithPartiton1":{"0":21482917},"Topic2WithPartiton2":{"1":103557997,"0":103547910}}

Итак, я думаю, что для мониторинга задержки смещения необходимо разработать специальные инструменты, которые имеют следующие функции.

  • Чтение по смещениям из HDFS.
  • Напишите смещение в теме Kafka __offset.

Таким образом, уже существующий инструмент мониторинга задержки смещения может отслеживать структурированную потоковую передачу на предмет задержки смещения kafka.

person kimutansk    schedule 18.09.2017

Способ 1. Если вы настроили checkpointLocation (HDFS/S3 и т. д.), перейдите по пути и найдете два каталога offsets и commits. Смещения содержат текущие смещения, в то время как коммиты имеют последние зафиксированные смещения. Вы можете перейти в каталог коммитов и открыть последний измененный файл, в котором вы можете найти последние зафиксированные смещения. В то время как последний файл в каталоге смещений содержит информацию об использованных смещениях.

Метод 2. Вы также можете отслеживать то же самое со следующими конфигурациями:

class CustomStreamingQueryListener extends StreamingQueryListener with AppLogging {

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    logDebug(s"Started query with id : ${event.id}," +
      s" name: ${event.name},runId : ${event.runId}")
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    val progress = event.progress
    logDebug(s"Streaming query made progress: ${progress.prettyJson}")
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    logDebug(s"Stream exited due to exception : ${event.exception},id : ${event.id}, " +
      s"runId: ${event.runId}")
  }

}

и добавьте его в конфигурацию ваших потоков.

spark.streams.addListener(new CustomStreamingQueryListener())
person Akhil Bojedla    schedule 27.07.2018

Несколько замечаний:

Мониторинг: готовый мониторинг можно найти на вкладке «Потоковая передача» задания Spark. Вы можете увидеть, какая текущая партия обрабатывается и сколько находится в очереди, чтобы проверить отставание.

Проверить максимальное и минимальное смещение для темы: у вас есть кли для проверки. Можно использовать ниже с сервера, на котором присутствует брокер kafka:

kafka-run-class \
kafka.tools.GetOffsetShell \
--broker-list your_broker1:port,your_broker2:port,your_broker3:port \
--topic your_topic \
--time -2

Более подробную информацию можно получить при интеграции с Grafana.

person Ramzy    schedule 29.04.2017
comment
Спасибо за комментарий. Однако: - Для мониторинга В уже сделанном мониторинге для Spark Structured Streaming нет такого понятия, как пакеты или записи с очередью, как в Spark Streaming. Или, по крайней мере, я не вижу его в пользовательском интерфейсе. - person ppanero; 01.05.2017
comment
Спасибо за комментарий. Однако: - Для инструмента kafka проблема заключается в том, что этот инструмент не поддерживает SSL. Пиар на него есть, но не слияние (или релиз пока). Кроме того, это дало бы мне смещение темы, но не то, насколько отстает мой потребитель, поскольку это еще один инструмент в kafka, но проблема в том, что Spark Structured Streaming не фиксирует смещения (опция 'auto.commit.offset ' не поддерживается), поэтому смещения обрабатываются ТОЛЬКО с помощью spark в какой-то папке, которую я не могу найти. - person ppanero; 01.05.2017
comment
Да, вы были правы, искра не спасет смещения. Мы просто сохраняем их в базу данных mysql. Или вы можете записать в файл. Однако это не структурированная потоковая передача. Вы должны иметь возможность получить смещение, по крайней мере, во время работы программы, а затем сохранить его. - person Ramzy; 01.05.2017
comment
Да это правда. Я хотел знать, могу ли я случайно этого избежать. Согласно документации, Sparks использует WAL для сохранения смещений в HDFS (думаю, только с включенной контрольной точкой), но мне не удалось увидеть, где именно. Спасибо! - person ppanero; 02.05.2017
comment
Если мы используем dstreams, то WAL не используются. Это только для старой реализации - person Ramzy; 02.05.2017
comment
Я использую структурированную потоковую передачу, а не обычную потоковую передачу Spark. Поэтому кадры данных. Ни RDD, ни DStreams (если я не ошибаюсь). Слайды на нем: es. slideshare.net/julesdamji/ От авторов. Как видите, на слайде 39 упоминается WAL, так что в данном случае он есть. Я просто не могу найти его. - person ppanero; 02.05.2017