Установить динамический путь для контрольной точки Flink для задания Flink в Yarn Cluser

Я использую Yarn для выполнения заданий Flink. Для каждого задания Flink я создаю контрольную точку.

Я отправляю задание Flink, которое выполняется в моем кластере Yarn. У меня есть задание на опрос, которое проверяет, не удалось ли выполнить задание на Yarn, и перезапускает его. Когда задание отправляется снова, Yarn создает новый application_id для этого задания Flink. Как я могу настроить повторно отправленное задание Flink для использования контрольной точки для перезапущенного задания Flink.

Я установил conf state.savepoints.dir = hdfs://localhost:9000/checkpoint/ в flink-conf.yaml`

При создании задания Flink streamExecutionEnvironment.setStateBackend(new FsStateBackend("hdfs://localhost:9000/checkpoint/uuid-job-1"));

Когда я установил этот параметр, контрольная точка сохраняется в пути, указанном в файле conf (hdfs://localhost:9000/checkpoint/), а не в пути, который я установил при создании задания Flink.

Любая помощь будет оценена. Спасибо!


person user3107673    schedule 22.05.2018    source источник


Ответы (1)


К сожалению, вы не можете начать новую работу с контрольных точек из старой. Что вы можете сделать, так это использовать внешние контрольные точки. Одним из недостатков flink ‹= 1.5 является то, что метаданные для внешних контрольных точек хранятся в одном каталоге для всех заданий, который задается параметром конфигурации: state.checkpoints.dir. Но вы можете изменять его перед каждой отправкой.

Еще одно примечание из списка рассылки поток:

Хорошей новостью является то, что Flink 1.5 немного переработает принцип работы внешних контрольных точек: в основном, все контрольные точки теперь могут считаться внешними, и метаданные будут храниться в корневом каталоге контрольной точки, а не в одном глобальном каталоге для всех заданий. Таким образом, метаданные для внешних контрольных точек находятся в каталоге контрольных точек каждого задания, и восстановление из них должно быть достаточно простым.

person Dawid Wysakowicz    schedule 23.05.2018
comment
But you can alter it before each submission - Вы хотите сказать, что я могу установить определенное местоположение для каждого задания, которое я отправляю с помощью Yarn. Я пробовал это. Я использовал определенное место для каждой присланной мной работы. Но это не сработало. - person user3107673; 23.05.2018
comment
Вы выполняете свою работу по каждому кластеру? или несколько рабочих мест в одном кластере? Вы можете изменить этот параметр для каждого кластера. Чтобы изменить этот параметр, вы должны отредактировать flink-conf.yaml. - person Dawid Wysakowicz; 24.05.2018
comment
Задание Flink выполняется в контейнере Yarn (несколько контейнеров в контейнере). Я разворачиваю задание в контейнере Yarn. По некоторым причинам мое приложение Yarn умирает, и я перезапускаю это задание. Я хочу, чтобы эту работу можно было продолжить с последней контрольной точки. Можно ли указать уникальный путь для каждого задания, чтобы, если задание умирает, и я повторно отправляю задание в Yarn, задание Flink будет использовать свою последнюю сохраненную контрольную точку. - person user3107673; 24.05.2018
comment
Как я уже сказал, вы можете сделать это с включенными внешними контрольными точками, и вам нужно будет изменить настройку state.checkpoints.dir в flink-conf.yaml перед каждой отправкой задания. К сожалению, вы не можете изменить этот параметр в коде. - person Dawid Wysakowicz; 24.05.2018
comment
@DawidWysakowicz правильно ли я прочитал, что невозможно установить state.checkpoints.dir программно? - person Yar; 26.03.2021