Захват событий изменений из PostgreSQL с помощью Debezium, развернутого в Docker Compose

В этом руководстве вы узнаете, как развернуть образ Postgres Docker, который можно использовать с Debezium для сбора измененных данных (CDC). Захваченные изменения затем записываются в тему Kafka. Позже мы воспользуемся утилитой kafkacat, чтобы изучить содержание этой темы.

В учебном пособии предполагается:

  • Вы знакомы с базой данных PostgreSQL и Debezium.
  • У вас есть практический опыт работы с Docker Compose.

Сценарий

Давайте возьмем простой вариант использования и построим на нем наш проект.

Представьте, что у нас есть shipments база данных в Postgres, которая отслеживает отгрузки заказов. Он имеет shipments таблицу в следующем формате.

shipment_db=# \d shipments;
                        Table "public.shipments"
    Column    |          Type          | Collation | Nullable | Default
--------------+------------------------+-----------+----------+---------
 shipment_id  | bigint                 |           | not null |
 order_id     | bigint                 |           | not null |
 date_created | character varying(255) |           |          |
 status       | character varying(25)  |           |          |
Indexes:
    "shipments_pkey" PRIMARY KEY, btree (shipment_id)
shipment_db=#

Каждый раз, когда приложение обновляет статус доставки заказа, отправляя запрос, как показано ниже:

UPDATE shipments SET status = "COMPLETED" WHERE order_id = 12500;

Нам необходимо зафиксировать это изменение как событие и доставить его брокеру событий, например Apache Kafka, чтобы нижестоящие службы могли соответствующим образом отреагировать на это.

Для этого мы используем Дебезиум.

Настройка Debezium с PostgreSQL

Первый шаг - настроить Debezium на мониторинг журнала упреждающей записи (WAL) Postgres, чтобы Debezium мог захватывать и передавать события изменений в Kafka.

Для этого нам нужно выполнить некоторые настройки на стороне Postgres. Давайте посмотрим на них подробнее.

Postgres и плагины логического декодирования

Логическое декодирование - это процесс извлечения всех постоянных изменений в таблицы базы данных в читаемый, простой для понимания формат, который можно интерпретировать без детального знания внутреннего состояния базы данных.

Начиная с PostgreSQL 9.4, логическое декодирование реализовано путем декодирования содержимого журнала упреждающей записи и их обработки в удобной для пользователя форме с помощью плагина вывода. Плагин вывода позволяет клиентам принимать изменения.

В Debezium есть Коннектор Postgres, который работает со следующими плагинами вывода.

  • Protobuf для кодирования изменений в формате Protobuf.
  • Wal2json для кодирования изменений в формате JSON.

Плагин вывода логического декодирования больше не нужен, если вы используете версию Debezium выше 0.10. Начиная с Debezium 0.10, коннектор поддерживает поток логической репликации PostgreSQL 10+ с использованием pgoutput,, который генерирует изменения непосредственно из потока репликации.

Суть в том, что вам может потребоваться создать и установить плагин вывода в Postgres, чтобы получать события изменений в Debezium в желаемом формате. Кроме того, это требует изменения некоторых настроек в файле конфигурации Postgress.

Подробную информацию об этом вы можете найти в документации Debezium.

Давайте использовать образ Docker, в котором уже есть плагины

В этом руководстве будет использоваться образ Postgress Docker, поддерживаемый Debezium, который содержит плагины логического декодирования и необходимые конфигурации Postgres.

На изображении вы можете увидеть файл конфигурации, подобный следующему. Это сообщает Postgress загрузить указанный shared_preload_libraries при запуске и установить для wal_level значение logical.

Настроить и запустить проект

Вы можете найти готовый проект в репозитории EdU samples Git repo. Он содержит только один файл docker-compose.yml, как показано ниже.

Идите и выполните следующее, чтобы клонировать репо и раскручивать контейнеры с помощью Docker Compose.

git clone https://github.com/dunithd/edu-samples.git
cd edu-samples/postgres-docker
docker-compose up -d

Если все пойдет хорошо, то появятся следующие контейнеры.

  • PostgreSQL - это база данных отгрузки, содержащая таблицу отгрузки.
  • Kafka + Zookeeper - брокер событий, в котором хранятся события CDC.
  • Реестр схем - служба Kafka, используемая Debezium для сериализации / десериализации сообщений CDC с использованием схемы Avro.
  • Debezium - фиксирует изменения, внесенные в Postgres, и передает их в тему Kafka.

База данных shipments

Во время запуска контейнер Postgres создает базу данных с именем shipment_db вместе с учетной записью пользователя. Затем сценарий shipments-db.sql монтирования тома используется для создания таблицы shipments и заполнения ее некоторыми записями.

Войдите в контейнер Postgres и используйте psql, чтобы просмотреть его содержимое.

shipment_db=# select * from shipments;
 shipment_id | order_id | date_created |   status
-------------+----------+--------------+------------
       30500 |    10500 | 2021-01-21   | COMPLETED
       31500 |    11500 | 2021-04-21   | COMPLETED
       32500 |    12500 | 2021-05-31   | PROCESSING
(3 rows)

Зарегистрируйте коннектор Postgres в Debezium

Теперь, когда у нас есть Debezium, он работает. Затем мы скажем Debezium, что нужно передавать изменения из shipment_db. Делаем это путем регистрации коннектора.

Войдите в контейнер Debezium и выполните его в терминале.

Обратите внимание, что plugin.name установлен на pgoutput. Остальные конфигурации говорят сами за себя.

После первой инициализации коннектора создается согласованный снимок shipment_db. После создания этого моментального снимка соединитель непрерывно фиксирует изменения на уровне строк, которые вставляют, обновляют и удаляют содержимое базы данных.

Если до этого момента все идет хорошо, вы должны увидеть тему, созданную в Kafka с именем postgres.public.shipments.

Посмотреть в действии

Давайте обновим запись в таблице отгрузок и посмотрим, отображается ли событие изменения в Kafka. Мы воспользуемся утилитой kafkacat, чтобы увидеть содержание темы.

Войдите в контейнер Postgres и выполните следующий запрос к shipment_db.

shipment_db=# update shipments set status='COMPLETED' where order_id = 12500;

Это обновит статус доставки заказа с ID 12500. Debezium фиксирует это изменение, преобразует его в формат Avro и записывает в тему postgres.public.shipments Kafka.

Чтобы увидеть содержимое, выполните следующие действия.

docker run --tty \
--network postgres-docker_default \
confluentinc/cp-kafkacat \
kafkacat -b kafka:9092 -C \
-s key=s -s value=avro \
-r http://schema-registry:8081 \
-t postgres.public.shipments

Вы должны увидеть результат, как показано ниже.

{"before": null, "after": {"Value": {"shipment_id": 32500, "order_id": 12500, "date_created": {"string": "2021-05-31"}, "status": {"string": "COMPLETED"}}}, "source": {"version": "1.4.2.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1628588482556, "snapshot": {"string": "false"}, "db": "shipment_db", "schema": "public", "table": "shipments", "txId": {"long": 493}, "lsn": {"long": 23832280}, "xmin": null}, "op": "u", "ts_ms": {"long": 1628588483027}, "transaction": null}

Куда пойти отсюда?

Это руководство служит отправной точкой, если вы планируете использовать Postgres и Debezium с Docker Compose. Образ Postgres, поддерживаемый Debezium, избавляет от необходимости устанавливать плагины логического декодера и изменять конфигурации Postgres.

Теперь вы можете расширить эту настройку, чтобы создать свои варианты использования. Например, вы перенаправляете события изменений в другую базу данных, загружаете их в хранилище данных или обновляете Elasticsearch.

Обратитесь к следующим статьям, которые я написал когда-то назад, чтобы узнать, что вы можете сделать с отслеживанием измененных данных.

8 практических примеров использования сбора измененных данных

Нежное введение в сбор данных об изменениях на основе событий

Визуальное знакомство с Дебезиумом