Захват событий изменений из PostgreSQL с помощью Debezium, развернутого в Docker Compose
В этом руководстве вы узнаете, как развернуть образ Postgres Docker, который можно использовать с Debezium для сбора измененных данных (CDC). Захваченные изменения затем записываются в тему Kafka. Позже мы воспользуемся утилитой kafkacat, чтобы изучить содержание этой темы.
В учебном пособии предполагается:
- Вы знакомы с базой данных PostgreSQL и Debezium.
- У вас есть практический опыт работы с Docker Compose.
Сценарий
Давайте возьмем простой вариант использования и построим на нем наш проект.
Представьте, что у нас есть shipments
база данных в Postgres, которая отслеживает отгрузки заказов. Он имеет shipments
таблицу в следующем формате.
shipment_db=# \d shipments; Table "public.shipments" Column | Type | Collation | Nullable | Default --------------+------------------------+-----------+----------+--------- shipment_id | bigint | | not null | order_id | bigint | | not null | date_created | character varying(255) | | | status | character varying(25) | | | Indexes: "shipments_pkey" PRIMARY KEY, btree (shipment_id) shipment_db=#
Каждый раз, когда приложение обновляет статус доставки заказа, отправляя запрос, как показано ниже:
UPDATE shipments SET status = "COMPLETED" WHERE order_id = 12500;
Нам необходимо зафиксировать это изменение как событие и доставить его брокеру событий, например Apache Kafka, чтобы нижестоящие службы могли соответствующим образом отреагировать на это.
Для этого мы используем Дебезиум.
Настройка Debezium с PostgreSQL
Первый шаг - настроить Debezium на мониторинг журнала упреждающей записи (WAL) Postgres, чтобы Debezium мог захватывать и передавать события изменений в Kafka.
Для этого нам нужно выполнить некоторые настройки на стороне Postgres. Давайте посмотрим на них подробнее.
Postgres и плагины логического декодирования
Логическое декодирование - это процесс извлечения всех постоянных изменений в таблицы базы данных в читаемый, простой для понимания формат, который можно интерпретировать без детального знания внутреннего состояния базы данных.
Начиная с PostgreSQL 9.4, логическое декодирование реализовано путем декодирования содержимого журнала упреждающей записи и их обработки в удобной для пользователя форме с помощью плагина вывода. Плагин вывода позволяет клиентам принимать изменения.
В Debezium есть Коннектор Postgres, который работает со следующими плагинами вывода.
- Protobuf для кодирования изменений в формате Protobuf.
- Wal2json для кодирования изменений в формате JSON.
Плагин вывода логического декодирования больше не нужен, если вы используете версию Debezium выше 0.10. Начиная с Debezium 0.10, коннектор поддерживает поток логической репликации PostgreSQL 10+ с использованием pgoutput
,, который генерирует изменения непосредственно из потока репликации.
Суть в том, что вам может потребоваться создать и установить плагин вывода в Postgres, чтобы получать события изменений в Debezium в желаемом формате. Кроме того, это требует изменения некоторых настроек в файле конфигурации Postgress.
Подробную информацию об этом вы можете найти в документации Debezium.
Давайте использовать образ Docker, в котором уже есть плагины
В этом руководстве будет использоваться образ Postgress Docker, поддерживаемый Debezium, который содержит плагины логического декодирования и необходимые конфигурации Postgres.
На изображении вы можете увидеть файл конфигурации, подобный следующему. Это сообщает Postgress загрузить указанный shared_preload_libraries
при запуске и установить для wal_level
значение logical
.
Настроить и запустить проект
Вы можете найти готовый проект в репозитории EdU samples Git repo. Он содержит только один файл docker-compose.yml, как показано ниже.
Идите и выполните следующее, чтобы клонировать репо и раскручивать контейнеры с помощью Docker Compose.
git clone https://github.com/dunithd/edu-samples.git cd edu-samples/postgres-docker docker-compose up -d
Если все пойдет хорошо, то появятся следующие контейнеры.
- PostgreSQL - это база данных отгрузки, содержащая таблицу отгрузки.
- Kafka + Zookeeper - брокер событий, в котором хранятся события CDC.
- Реестр схем - служба Kafka, используемая Debezium для сериализации / десериализации сообщений CDC с использованием схемы Avro.
- Debezium - фиксирует изменения, внесенные в Postgres, и передает их в тему Kafka.
База данных shipments
Во время запуска контейнер Postgres создает базу данных с именем shipment_db
вместе с учетной записью пользователя. Затем сценарий shipments-db.sql
монтирования тома используется для создания таблицы shipments
и заполнения ее некоторыми записями.
Войдите в контейнер Postgres и используйте psql
, чтобы просмотреть его содержимое.
shipment_db=# select * from shipments; shipment_id | order_id | date_created | status -------------+----------+--------------+------------ 30500 | 10500 | 2021-01-21 | COMPLETED 31500 | 11500 | 2021-04-21 | COMPLETED 32500 | 12500 | 2021-05-31 | PROCESSING (3 rows)
Зарегистрируйте коннектор Postgres в Debezium
Теперь, когда у нас есть Debezium, он работает. Затем мы скажем Debezium, что нужно передавать изменения из shipment_db
. Делаем это путем регистрации коннектора.
Войдите в контейнер Debezium и выполните его в терминале.
Обратите внимание, что plugin.name
установлен на pgoutput
. Остальные конфигурации говорят сами за себя.
После первой инициализации коннектора создается согласованный снимок shipment_db
. После создания этого моментального снимка соединитель непрерывно фиксирует изменения на уровне строк, которые вставляют, обновляют и удаляют содержимое базы данных.
Если до этого момента все идет хорошо, вы должны увидеть тему, созданную в Kafka с именем postgres.public.shipments
.
Посмотреть в действии
Давайте обновим запись в таблице отгрузок и посмотрим, отображается ли событие изменения в Kafka. Мы воспользуемся утилитой kafkacat
, чтобы увидеть содержание темы.
Войдите в контейнер Postgres и выполните следующий запрос к shipment_db
.
shipment_db=# update shipments set status='COMPLETED' where order_id = 12500;
Это обновит статус доставки заказа с ID 12500. Debezium фиксирует это изменение, преобразует его в формат Avro и записывает в тему postgres.public.shipments
Kafka.
Чтобы увидеть содержимое, выполните следующие действия.
docker run --tty \ --network postgres-docker_default \ confluentinc/cp-kafkacat \ kafkacat -b kafka:9092 -C \ -s key=s -s value=avro \ -r http://schema-registry:8081 \ -t postgres.public.shipments
Вы должны увидеть результат, как показано ниже.
{"before": null, "after": {"Value": {"shipment_id": 32500, "order_id": 12500, "date_created": {"string": "2021-05-31"}, "status": {"string": "COMPLETED"}}}, "source": {"version": "1.4.2.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1628588482556, "snapshot": {"string": "false"}, "db": "shipment_db", "schema": "public", "table": "shipments", "txId": {"long": 493}, "lsn": {"long": 23832280}, "xmin": null}, "op": "u", "ts_ms": {"long": 1628588483027}, "transaction": null}
Куда пойти отсюда?
Это руководство служит отправной точкой, если вы планируете использовать Postgres и Debezium с Docker Compose. Образ Postgres, поддерживаемый Debezium, избавляет от необходимости устанавливать плагины логического декодера и изменять конфигурации Postgres.
Теперь вы можете расширить эту настройку, чтобы создать свои варианты использования. Например, вы перенаправляете события изменений в другую базу данных, загружаете их в хранилище данных или обновляете Elasticsearch.
Обратитесь к следующим статьям, которые я написал когда-то назад, чтобы узнать, что вы можете сделать с отслеживанием измененных данных.
8 практических примеров использования сбора измененных данных
Нежное введение в сбор данных об изменениях на основе событий