Как захватить данные в mysql с помощью захвата данных изменения debezium и потребить с помощью приемника jdbc в kafka connect?

У меня проблема с захватом данных в mysql с захватом данных изменения debezium и их потреблением на другой mysql с помощью kafka connect jdbc.

Поскольку схема и полезные данные, которые debezium создает для темы kafka, несовместимы со схемой, которую ожидает приемник kafka connect jdbc.

Я получаю исключение, когда приемник jdbc хочет использовать данные и создавать записи в другом mysql.

Как мне решить эту проблему?


person Jason Foster    schedule 14.05.2017    source источник


Ответы (1)


Структура сообщения, созданная Debezium, действительно отличается от той, которую ожидает приемник JDBC. Приемник JDBC ожидает, что каждое поле в сообщении будет соответствовать полю в строке, и поэтому сообщение соответствует состоянию «после» строки. OTOH, соединитель Debezium MySQL выполняет сбор измененных данных, что означает больше, чем просто включение последнее состояние строки. В частности, коннектор выводит сообщения с ключом, содержащим столбцы первичного или уникального ключа строки, и значение сообщения, содержащее структуру конверта с:

  • операция, например вставка, обновление или удаление
  • состояние строки до того, как произошло изменение (null для вставок)
  • состояние строки после, когда произошло изменение (ноль при удалении)
  • информация, зависящая от источника, включая метаданные сервера, идентификатор транзакции, имена базы данных и таблиц, отметку времени сервера, когда произошло событие, а также сведения о том, где было обнаружено событие, и т. д.
  • отметка времени, когда коннектор сгенерировал событие

Самый простой способ устранить это несоответствие - использовать Kafka 0.10.2.x (в настоящее время последняя версия - 0.10.2.1) и новый Преобразования отдельных сообщений (SMT). Каждый соединитель Kafka Connect может быть сконфигурирован с цепочками из нуля или более SMT, которые могут преобразовывать вывод исходных соединителей до того, как сообщения будут записаны в Kafka, или преобразовать сообщения, прочитанные из Kafka, прежде чем они будут переданы в качестве входных в соединители приемника. SMT намеренно очень просты, работают с одним сообщением и определенно не должны получать доступ к внешним ресурсам или поддерживать какое-либо состояние, и, следовательно, не заменяют Kafka Streams или другие системы обработки потоков, которые намного мощнее, могут объединять несколько входных потоков и могут выполнять очень сложные операции и поддерживать состояние для нескольких сообщений.

Если вы используете Kafka Streams для какой-либо обработки, вам следует рассмотреть возможность управления структурой сообщений в вашем приложении Kafka Streams. Если нет, то SMT - отличный способ решить вашу проблему. Фактически, есть два способа использовать SMT для настройки структуры сообщения.

Первый вариант - использовать SMT с коннектором Debezium для извлечения / сохранения состояния «после» строки и отбрасывания всей остальной информации до того, как она будет записана в Kafka. Конечно, вы будете хранить меньше информации в темах Kafka и выбросить часть информации CDC, которая может оказаться полезной в будущем.

Второй и предпочтительный вариант IMO - оставить исходный соединитель как есть и сохранить все сообщения CDC в темах Kafka, но затем использовать SMT с соединителем приемника для извлечения / сохранения состояния «после» строки и отбросить всю остальную информацию до того, как сообщение будет передано коннектору приемника JDBC. Возможно, вы сможете использовать один из существующих SMT, включенных в Kafka Connect, но вы можете подумать о написании собственного SMT, чтобы делать именно то, что вы хотите.

person Randall Hauch    schedule 15.05.2017
comment
Спасибо, дорогой Рэндалл, за отличный ответ. - person Jason Foster; 15.05.2017
comment
Обратите внимание, что более новые версии Debezium предоставляют такой SMT прямо из коробки. У нас также есть полный пример, демонстрирующий взаимодействие с Коннектор раковины JDBC. - person Gunnar; 11.07.2018