Структура сообщения, созданная Debezium, действительно отличается от той, которую ожидает приемник JDBC. Приемник JDBC ожидает, что каждое поле в сообщении будет соответствовать полю в строке, и поэтому сообщение соответствует состоянию «после» строки. OTOH, соединитель Debezium MySQL выполняет сбор измененных данных, что означает больше, чем просто включение последнее состояние строки. В частности, коннектор выводит сообщения с ключом, содержащим столбцы первичного или уникального ключа строки, и значение сообщения, содержащее структуру конверта с:
- операция, например вставка, обновление или удаление
- состояние строки до того, как произошло изменение (null для вставок)
- состояние строки после, когда произошло изменение (ноль при удалении)
- информация, зависящая от источника, включая метаданные сервера, идентификатор транзакции, имена базы данных и таблиц, отметку времени сервера, когда произошло событие, а также сведения о том, где было обнаружено событие, и т. д.
- отметка времени, когда коннектор сгенерировал событие
Самый простой способ устранить это несоответствие - использовать Kafka 0.10.2.x (в настоящее время последняя версия - 0.10.2.1) и новый Преобразования отдельных сообщений (SMT). Каждый соединитель Kafka Connect может быть сконфигурирован с цепочками из нуля или более SMT, которые могут преобразовывать вывод исходных соединителей до того, как сообщения будут записаны в Kafka, или преобразовать сообщения, прочитанные из Kafka, прежде чем они будут переданы в качестве входных в соединители приемника. SMT намеренно очень просты, работают с одним сообщением и определенно не должны получать доступ к внешним ресурсам или поддерживать какое-либо состояние, и, следовательно, не заменяют Kafka Streams или другие системы обработки потоков, которые намного мощнее, могут объединять несколько входных потоков и могут выполнять очень сложные операции и поддерживать состояние для нескольких сообщений.
Если вы используете Kafka Streams для какой-либо обработки, вам следует рассмотреть возможность управления структурой сообщений в вашем приложении Kafka Streams. Если нет, то SMT - отличный способ решить вашу проблему. Фактически, есть два способа использовать SMT для настройки структуры сообщения.
Первый вариант - использовать SMT с коннектором Debezium для извлечения / сохранения состояния «после» строки и отбрасывания всей остальной информации до того, как она будет записана в Kafka. Конечно, вы будете хранить меньше информации в темах Kafka и выбросить часть информации CDC, которая может оказаться полезной в будущем.
Второй и предпочтительный вариант IMO - оставить исходный соединитель как есть и сохранить все сообщения CDC в темах Kafka, но затем использовать SMT с соединителем приемника для извлечения / сохранения состояния «после» строки и отбросить всю остальную информацию до того, как сообщение будет передано коннектору приемника JDBC. Возможно, вы сможете использовать один из существующих SMT, включенных в Kafka Connect, но вы можете подумать о написании собственного SMT, чтобы делать именно то, что вы хотите.
person
Randall Hauch
schedule
15.05.2017