Проблемы с потребителями при обновлении RocksDB

У меня есть работа, которая потребляет от RabbitMQ, я использовал FS State Backend, но кажется, что размеры состояний стали больше, и затем я решил переместить свои состояния в RocksDB. Проблема в том, что в течение первых часов работы с заданием все в порядке, событие через большее время, если трафик становится медленнее, но затем, когда трафик снова становится высоким, у потребителя возникают проблемы (события накапливаются как неупакованные), а затем эти проблемы отражено в остальной части приложения.

У меня: 4 ядра процессора
Локальный диск
16 ГБ ОЗУ
Среда Unix
Flink 1.11
Scala версии 2.11
1 одно задание, выполняющееся с несколькими ключевыми потоками, и около 10 преобразований и опускаемся до Postgres

некоторые конфигурации

flink.buffer_timeout=50
flink.maxparallelism=4
flink.memory=16
flink.cpu.cores=4
#checkpoints
flink.checkpointing_compression=true
flink.checkpointing_min_pause=30000
flink.checkpointing_timeout=120000
flink.checkpointing_enabled=true
flink.checkpointing_time=60000
flink.max_current_checkpoint=1
#RocksDB configuration
state.backend.rocksdb.localdir=home/username/checkpoints (this is not working don't know why)
state.backend.rocksdb.thread.numfactory=4
state.backend.rocksdb.block.blocksize=16kb
state.backend.rocksdb.block.cache-size=512mb
#rocksdb or heap
state.backend.rocksdb.timer-service.factory=heap (I have test with rocksdb too and is the same)
state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED

Сообщите мне, если потребуется дополнительная информация?


person Alejandro Deulofeu    schedule 10.09.2020    source источник


Ответы (1)


state.backend.rocksdb.localdir должен быть абсолютным, а не относительным путем. И этот параметр не указывает, куда идут контрольные точки (чего не должно быть на локальном диске), этот параметр предназначен для указания, где сохраняется рабочее состояние (которое должно быть на локальном диске).

Ваша работа испытывает противодавление, а это означает, что какая-то часть конвейера не успевает за ней. Наиболее частыми причинами противодавления являются (1) неспособные к работе поглотители и (2) неадекватные ресурсы (например, слишком низкий параллелизм).

Вы можете проверить, является ли проблема с postgres, запустив задание с отбрасывающим приемником.

Анализ различных показателей должен дать вам представление о том, какие ресурсы могут быть недостаточно выделены.

person David Anderson    schedule 10.09.2020
comment
Я проверял противодавление в Flink Dashboard, и ни один из операторов не имел противодавления, по крайней мере, так говорят показатели, но не уверен, что это на 100% правда. У меня всего 4 ядра процессора, как вы думаете, стоит ли увеличить параллелизм? этот state.backend.rocksdb.localdir сейчас отключен. Я могу запустить тест, отбросив Sink, но не уверен, что это будет решение, но я сделаю это. - person Alejandro Deulofeu; 10.09.2020
comment
Локальный каталог Rockdb по умолчанию - / tmp. - person David Anderson; 10.09.2020
comment
Как вы думаете, увеличение параллелизма - хорошая идея? - person Alejandro Deulofeu; 10.09.2020
comment
Монитор противодавления в Flink Dashboard не является полностью надежным индикатором; он мог бы сказать, что все в порядке, когда это не так. Но проверяли ли вы противодавление в каждой подзадаче? Если исходники Flink не успевают за входными данными RabbitMQ, значит, что-то находится под давлением. - person David Anderson; 10.09.2020
comment
Да, я проверил все подзадачи, и все они были отмечены как ОК. Я сначала увеличу параллелизм до 8 и посмотрю, станет ли оно лучше, если нет, я отключу раковины, чтобы увидеть, это вас осмыслит? - person Alejandro Deulofeu; 10.09.2020
comment
Увеличивая параллелизм, я получаю более высокое использование ЦП и памяти, как и ожидалось, я думаю, контрольные точки немного больше, но этого следовало ожидать, но в положительном смысле у меня есть стабильный потребитель и задержка на данный момент. - person Alejandro Deulofeu; 10.09.2020