Коннектор источника эластичного поиска Flink

Я новичок в интеграции Flink и Elastic Search. У меня есть сценарий, в котором мне нужно загрузить данные истории (примерно 1 ТБ) из старого эластичного поискового кластера (5.6) в новый кластер (6.8). Мне нужно выполнить некоторую фильтрацию и модификацию данных во время миграции. Подумываю об использовании пакетного задания flink с оператором flink-es -ink.

Но поскольку в настоящее время нет доступного оператора flink-es-source, как лучше всего передать данные в мой конвейер flink? У меня есть несколько вариантов сделать это.

  1. Напишите функцию плоской карты / функцию процесса и получите запись
  2. Используйте некоторые сторонние библиотеки с открытым исходным кодом для подключения flink к ES. Но не хочу рисковать, потому что не знаю, как работают эти программы.

Но не уверен, что лучше, поскольку размер данных огромен, мне, возможно, придется распараллелить исходный оператор.

Пожалуйста, предложите несколько вариантов, если кто-то из вас сталкивался с этим сценарием. заранее спасибо


person Abhi    schedule 10.09.2020    source источник
comment
Пакетный (DataSet) API Flink может использоваться с любым входным форматом hadoop. Это могло бы быть решением.   -  person David Anderson    schedule 10.09.2020
comment
Но опять же, как я решу исходный параллелизм, потому что я не хочу снова и снова читать одни и те же данные из индексов эластичного поиска, если параллелизм ›1 правильный? не могли бы вы предоставить более подробную информацию о том, как решить эту проблему с помощью API данных   -  person Abhi    schedule 10.09.2020
comment
stackoverflow .com / questions / 63747019 / и stackoverflow.com/questions/54329298/ может помочь. И вы можете использовать getRuntimeContext().getIndexOfThisSubtask, если необходимо идентифицировать каждый параллельный экземпляр.   -  person David Anderson    schedule 10.09.2020