Я новичок в интеграции Flink и Elastic Search. У меня есть сценарий, в котором мне нужно загрузить данные истории (примерно 1 ТБ) из старого эластичного поискового кластера (5.6) в новый кластер (6.8). Мне нужно выполнить некоторую фильтрацию и модификацию данных во время миграции. Подумываю об использовании пакетного задания flink с оператором flink-es -ink.
Но поскольку в настоящее время нет доступного оператора flink-es-source, как лучше всего передать данные в мой конвейер flink? У меня есть несколько вариантов сделать это.
- Напишите функцию плоской карты / функцию процесса и получите запись
- Используйте некоторые сторонние библиотеки с открытым исходным кодом для подключения flink к ES. Но не хочу рисковать, потому что не знаю, как работают эти программы.
Но не уверен, что лучше, поскольку размер данных огромен, мне, возможно, придется распараллелить исходный оператор.
Пожалуйста, предложите несколько вариантов, если кто-то из вас сталкивался с этим сценарием. заранее спасибо
getRuntimeContext().getIndexOfThisSubtask
, если необходимо идентифицировать каждый параллельный экземпляр. - person David Anderson   schedule 10.09.2020