Как установить offset.commit.policy на AlwaysCommitOffsetPolicy в debezium?

Я создал механизм Debezium Embedded для сбора данных об изменениях MySQL. Я хочу зафиксировать смещения как можно скорее. В коде создается конфигурация, включая следующие.

.with("offset.commit.policy",OffsetCommitPolicy.AlwaysCommitOffsetPolicy.class.getName())

Запуск этого возврата, java.lang.NoSuchMethodException: io.debezium.embedded.spi.OffsetCommitPolicy$AlwaysCommitOffsetPolicy.<init>(io.debezium.config.Configuration)

Однако, когда я запускаю встроенный механизм с .with("offset.commit.policy",OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName()), встроенный механизм работает нормально.

Обратите внимание, что конструктор класса OffsetCommitPolicy.PeriodicCommitOffsetPolicy включает параметр конфигурации, а OffsetCommitPolicy.AlwaysCommitOffsetPolicy - нет.

public PeriodicCommitOffsetPolicy(Configuration config) { ... }

Как заставить встроенный движок Debezium использовать его AlwaysCommitOffsetPolicy?


person chathuranga siriwardhana    schedule 10.10.2018    source источник


Ответы (2)


Спасибо за отчет. Отчасти это ошибка (мы были бы признательны, если бы вы могли войти в нашу Jira). Вы можете решить эту проблему, вызвав специальный построитель встроенного движка, например io.debezium.embedded.EmbeddedEngine.create (). With (OffsetCommitPolicy.always ()) '

person Jiri Pechanec    schedule 11.10.2018
comment
Спасибо. io.debezium.embedded.EmbeddedEngine.create().using(OffsetCommitPolicy.always()) отлично работает. - person chathuranga siriwardhana; 11.10.2018

Протестировано с версией 1.4.0Final:

new EmbeddedEngine.BuilderImpl() // create builder
        .using(config) // regular config
        .using(OffsetCommitPolicy.always()) // explicit commit policy
        .notifying(this::handleEvent) // even procesor
        .build(); // and finally build!
person PranasB    schedule 02.02.2021