Источник событий - это архитектурный паттерн, который сохраняет все изменения состояния приложения в виде последовательности событий, а затем определяет текущее состояние путем воспроизведения событий.
Мы можем не только запрашивать эти события, мы также можем использовать журнал событий для восстановления прошлых состояний в произвольные моменты времени и в качестве основы для корректировки состояния, чтобы справиться с ретроактивными изменениями.
В недавнем обсуждении источников событий и баз данных я внезапно вспомнил:
«PostgreSQL - УДИВИТЕЛЬНЫЙ»
Свежее осознание потрясающих возможностей PostgreSQL, в свою очередь, заставило меня спросить:
«Как бы выглядела система с источником событий, если бы мы поместили как можно больше в саму базу данных?».
И впоследствии меня привели в кроличью нору экспериментов и чудес.
Подход
События хранятся в базе данных, и прогнозы создаются немедленно с использованием триггеров и функций.
Основная последовательность действий:
событие
| ›после триггера вставки
|› функция триггера
| ›функция проекции
|› проекция
По причинам, которые станут понятны, мы используем функцию триггера, чтобы обернуть другую функцию для создания проекта.
У этой модели есть несколько преимуществ:
- Триггеры гарантируют, что прогнозы всегда актуальны;
- Прогнозы происходят в контексте транзакции начальной вставки события; а также
- Мы не теряем возможность воспроизвести поток событий, повторно используя логику проекции (подробнее об этом позже)
События
События хранятся в простой таблице событий.
CREATE TABLE "events" ( "id" serial primary key not null, "uuid" uuid NOT NULL, "type" text NOT NULL, "body" jsonb NOT NULL, "inserted_at" timestamp(6) NOT NULL DEFAULT statement_timestamp() );
Несколько замечаний:
а) мы предполагаем, что все объекты / сущности в системе имеют глобальный уникальный идентификатор или uuid. UUID, вероятно, нужно управлять вне таблицы событий.
б) все события имеют тип, например «user_create» или «post_delete»
c) данные событий хранятся как json с использованием столбца jsonb (postgresql - это круто)
Имея структуру на месте, мы можем хранить события в таблице.
Здесь у нас есть пример события, отслеживающего обновление имени идентификатора пользователя по указанному uuid.
insert into events (type, uuid, body) values ('user_update', '11111111-1111-1111-1111-111111111111', '{"name"
Функции проецирования
Функция проекции выполняет фактическую работу по обработке данных события и сопоставлению с соответствующей проекцией.
Например, в случае события user_update, созданного выше, мы хотели бы взять значение name из тела события и обновить доступная только для чтения запись пользователя.
Для этого мы можем создавать функции в PostgreSQL.
Предполагая, что у нас есть таблица users с именем и uuid идентификатором для пользователей, следующая функция обновляет запись пользователя в таблице на основе ` user_update` событие:
create or replace function fn_project_user_update(uuid uuid, body jsonb) returns void security definer language plpgsql as $$ begin update users SET name = body->>’name’, updated_at = NOW() where users.uuid = fn_project_user_update.uuid; end; $$;
Поскольку PostgreSQL великолепен, мы можем извлекать данные из тела json, используя синтаксис body - ›› ’name’.
Триггеры
Когда у нас есть функция проекции, мы можем обернуть ее триггером, чтобы он вызывался при вставке. Триггеры в PostgreSQL могут быть условными, поэтому мы можем гарантировать, что триггер будет выполнен только в том случае, если был вставлен соответствующий тип события.
create or replace function fn_trigger_user_update() returns trigger security definer language plpgsql as $$ begin perform fn_project_user_update(new.uuid, new.body); return new; end; $$; create trigger event_insert_user_update after insert on events for each row when (new.type = ‘user_update’) execute procedure fn_trigger_user_update();
Триггер - это просто оболочка для функции проекции, которая передает новые данные.
При необходимости более сложные события можно моделировать как вызовы нескольких функций проекции.
Воспроизведение потока событий
Использование функций проекции означает, что события могут обрабатываться вне механизма запуска таблицы, и в любой момент события могут быть воспроизведены, просто вызвав функцию и передав правильный идентификатор и данные.
Используя больше возможностей PostgreSQL, мы можем запрашивать и воссоздавать поток событий.
Следующее воссоздает события user_ * для пользователя, идентифицированного указанным uuid.
do language plpgsql $$ declare e record; begin for e in select type, uuid, body from events where uuid = ‘11111111–1111–1111–1111–111111111111’ order by inserted_at asc loop case e.type when ‘user_create’ then perform fn_project_user_create(e.uuid, e.body); when ‘user_update’ then perform fn_project_user_update(e.uuid, e.body); end case; end loop; end; $$;
Любой действительный запрос может использоваться в качестве основы для цикла воспроизведения и любая комбинация допустимых обработчиков событий.
Но подождите, это еще не все.
PostgreSQL не ограничивается только итеративной обработкой событий, проекция может быть просто динамическим запросом к таблице событий (при условии, что производительность не является или проблемой), или быть материализованным представлением или любой другой подходящей конструкцией.
Здесь мы создаем материализованное представление для отслеживания uuid и имен пользователей в таблице событий:
create materialized view users_view as with t as ( select *, row_number() over(partition by uuid order by inserted_at desc) as row_number from events where type = ‘user_update’ ) select uuid, body->>’name’ as name, inserted_at from t where row_number = 1; select * from users_view;
В этом случае мы предполагаем, что самое последнее событие обновления содержит правильные данные пользователя, и запрашиваем, чтобы найти самое последнее событие user_update для каждого идентифицированного пользователя.
В PostgreSQL есть большое количество инструментов, которые стоит изучить, если вы рассматриваете модель источника событий.
Пример кода находится на tobyhede / postgresql-event-sourcing.