Проблемы с контрольными точками в Flink 1.10.1 с использованием серверной части состояния RocksDB

У нас возникла очень трудная для наблюдения проблема с нашей работой Flink.

Работа достаточно проста, она:

  1. Читает сообщения от Kinesis с помощью коннектора Flink Kinesis
  2. Сохраняет сообщения и передает их примерно 30 различным операторам CEP, а также нескольким настраиваемым функциям Window.
  3. Сообщения, отправленные из CEP / Windows, пересылаются SinkFunction, которая записывает сообщения в SQS.

Мы запускаем Flink 1.10.1 Fargate, используя 2 контейнера с 4vCPU / 8GB, мы используем бэкэнд состояния RocksDB со следующей конфигурацией:

state.backend: rocksdb
state.backend.async: true
state.backend.incremental: false
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.files.open: 130048

Задание выполняется с параллелизмом 8.

Когда задание начинается с "холодного" состояния, оно использует очень мало ЦП, а контрольные точки выполняются за 2 секунды. Со временем размеры контрольно-пропускных пунктов увеличиваются, но время все еще очень разумное - пара секунд:

размер контрольной точки и время контрольной точки

В течение этого времени мы можем наблюдать, как загрузка ЦП нашими диспетчерами задач по какой-то причине плавно растет:

Процессор диспетчера задач с течением времени

В конце концов, время контрольной точки начнет увеличиваться до нескольких минут, а затем просто начнет многократно истекать время (10 минут). На данный момент:

  • Размер контрольной точки (когда она будет завершена) составляет около 60 МБ.
  • Загрузка ЦП высокая, но не 100% (обычно около 60-80%)
  • Глядя на текущие контрольные точки, обычно 95% + операторов завершают контрольную точку за 30 секунд, но горстка просто застревает и никогда не завершит. Приемник SQS всегда будет включен в это, но SinkFunction не является богатым и не имеет состояния.
  • Использование монитора противодавления на этих операторах сообщает о ВЫСОКОМ противодавлении.

В конце концов, эта ситуация разрешается одним из двух способов:

  1. Достаточное количество контрольных точек не приводит к сбою задания из-за превышения порогового значения пропорции контрольной точки
  2. Контрольные точки в конечном итоге начинают работать успешно, но никогда не возвращаются к 5-10 секундам, которые они занимают изначально (когда размер состояния больше похож на 30 МБ против 60 МБ)

Мы действительно не понимаем, как это отладить. Наше государство кажется очень маленьким по сравнению с тем, что вы видите здесь в некоторых вопросах. Наши объемы также довольно низкие, мы очень часто оказываемся ниже 100 записей в секунду.

Мы были бы очень признательны за любой вклад в области, в которых мы могли бы изучить, чтобы отладить это.

Спасибо,


person Jamalarm    schedule 01.10.2020    source источник
comment
Используете ли вы TTL состояния, и если да, то как он настроен?   -  person David Anderson    schedule 01.10.2020


Ответы (2)


Несколько моментов:

Для государства нет ничего необычного в постепенном росте с течением времени. Возможно, ваше ключевое пространство растет, и вы сохраняете какое-то состояние для каждого ключа. Если вы полагаетесь на TTL состояния для истечения срока действия устаревшего состояния, возможно, он не настроен таким образом, чтобы он мог очищать истекшее состояние так быстро, как вы ожидали. Также относительно легко непреднамеренно создать шаблоны CEP, которые должны сохранять некоторое состояние в течение очень долгого времени, прежде чем можно будет исключить определенные возможные совпадения.

Следующим хорошим шагом будет определение причины противодавления. Чаще всего причина в том, что у работы нет достаточных ресурсов. Большинству рабочих мест постепенно со временем требуется больше ресурсов, так как количество управляемых пользователей (например) увеличивается. Например, вам может потребоваться увеличить параллелизм, или дать экземплярам больше памяти, или увеличить емкость приемника (ов) (или скорость сети до приемника (ов)), или предоставить RocksDB более быстрые диски.

Помимо недостаточной подготовки, к другим причинам противодавления относятся:

  • блокировка ввода-вывода выполняется в пользовательской функции
  • одновременно срабатывает большое количество таймеров
  • рассогласование времени событий между разными источниками приводит к буферизации большого количества состояний
  • перекос данных (горячая клавиша) подавляет одну подзадачу или слот
  • длинные паузы сборщика мусора
  • конкуренция за критические ресурсы (например, использование NAS в качестве локального диска для RocksDB)

Включение собственных метрик RocksDB может дать некоторое представление.

person David Anderson    schedule 02.10.2020

Добавьте это свойство в свою конфигурацию:

state.backend.rocksdb.checkpoint.transfer.thread.num: {threadNumberAccordingYourProjectSize}

если вы не добавите это, будет 1 (по умолчанию)

Ссылка: https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java#L62

person monstereo    schedule 07.10.2020