Flink: преобразование binlog в несколько DTO и метод преобразования во flink

Новичок в Kafka, Flink и Tidb. Предположим, у меня есть три исходные таблицы MySql s_a, s_b и s_c, и я хочу собирать записи для целевой таблицы TiDb t_a и t_b в реальном времени. Правила сопоставления

`s_a`  --> `t_a`                   
`s_b` union `s_c`   ---> `t_b`  with some transformation (e.g., field remapping).

Решение, которое я принял, — это kafka + Flink с приемником Tidb, где изменения binlog подписываются на тему Kafka; Flink использует тему и записывает преобразованный результат в Tidb. Проблема для меня в части кода flink:

  1. как я могу легко восстановить строку json (содержащую информацию об операциях, таблицах), опрашиваемую из kafka, для различных операций DTO (например, вставка/создание t_a или t_b ). Я нашел инструмент под названием Debezium в качестве коннектора Kafka&Flink, но похоже, что он требует равенства между исходной и целевой таблицами.

  2. Как написать преобразование VKDataMapper, если у меня есть несколько целевых таблиц. Мне трудно определить T, так как это может быть t_a DTO (объект передачи данных) или t_b DTO.

Существующий пример кода для меня выглядит так:

// Основная процедура.

   StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
       //consume is FlinkkafkaConsumer. TopicFilter returns true. 
        environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
                .addSink(new TidbSink());

        try {
            environment.execute();
        } catch (Exception e) {
            log.error("exception {}", e);
        }
 
 
 public class VKDataMapper implements MapFunction<String, T> {

    @Override
    public T map(String value) throws Exception {
        //How T can represents both `T_a data DTO` `T_b`...., 
        return null;
    }

}

person shijie xu    schedule 20.11.2020    source источник
comment
Вы просматривали github.com/ververica/flink-cdc-connectors? Возможно, это упростило бы дело.   -  person David Anderson    schedule 20.11.2020


Ответы (1)


Почему бы не попробовать Flink SQL? Таким образом, вам нужно только создать несколько таблиц во Flink, а затем определить свои задачи через sql, например:

insert into t_a select * from s_a;
insert into t_b select * from s_b union select * from s_c;

См. некоторые примеры в https://github.com/LittleFall/flink-tidb-rdw, не стесняйтесь спрашивать все, что вас смущает.

person Littlefall    schedule 27.11.2020
comment
Добро пожаловать на канал Slack сообщества TiDB ЗДЕСЬ, чтобы взаимодействовать с tidb и получайте более быстрые ответы~ - person Littlefall; 01.12.2020
comment
Спасибо @Littlefall. Видел flink-tidb-rdw, еще проверю. Не могли бы вы добавить WeChat (хотя я отправил вам письмо ранее)? - person shijie xu; 06.12.2020
comment
Да! Очень жаль, что так поздно, я только что отправил вам запрос на добавление в друзья. - person Littlefall; 15.12.2020