Реализация саги на основе событий

Я написал агрегат с источником событий, а теперь реализовал сагу с источником событий ... Я заметил, что они похожи друг на друга, и создал объект с источником событий в качестве базового класса, от которого оба являются производными.

Я видел здесь одну демонстрацию http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/, но чувствую, что может возникнуть проблема, поскольку команды могут быть потеряны в случае сбоя процесса, поскольку отправка команд находится вне транзакции записи?

public void Save(ISaga saga)
{
    var events = saga.GetUncommittedEvents();
    eventStore.Write(new UncommittedEventStream
    {
        Id = saga.Id,
        Type = saga.GetType(),
        Events = events,
        ExpectedVersion = saga.Version - events.Count
    });

    foreach (var message in saga.GetUndispatchedMessages())
        bus.Send(message); // can be done in different ways

    saga.ClearUncommittedEvents();
    saga.ClearUndispatchedMessages();
}

Вместо этого я использую EventStore Грега Янга, и когда я сохраняю EventSourcedObject (либо агрегат, либо сагу), последовательность действий выглядит следующим образом:

  1. Репозиторий получает список новых MutatingEvents.
  2. Записывает их в поток.
  3. EventStore запускает новые события, когда потоки записываются и фиксируются в потоке.
  4. Мы слушаем события из EventStore и обрабатываем их в EventHandlers.

Я реализую два аспекта саги:

  1. Принимать события, которые могут переходить в состояние, что, в свою очередь, может выдавать команды.
  2. Чтобы иметь будильник, когда в какой-то момент в будущем (через службу внешнего таймера) нам могут перезвонить).

Вопросы

  1. Насколько я понимаю, обработчики событий не должны выдавать команды (что произойдет, если команда не удастся?), Но согласен ли я с вышеизложенным, поскольку сага - это то, что фактически контролирует создание команд (в ответ на события ) через этот прокси-сервер event, и любой сбой отправки команды может быть обработан извне (во внешнем обработчике событий, который имеет дело с CommandEmittedFromSaga и повторно отправляет команду в случае сбоя команды)?

  2. Или я забываю обертывать события и хранить собственные Commands и Events в одном потоке (смешанном с сообщением базового класса - сага будет потреблять и команды, и события, а агрегат будет потреблять только события)?

  3. Есть ли в сети другие справочные материалы по реализации саг из событийного источника? С чем я могу проверить свои идеи?

Ниже приведен некоторый фоновый код.

Saga выдает команду "Выполнить" (заключенная в событие CommandEmittedFromSaga)

Команда ниже заключена в событие:

public class CommandEmittedFromSaga : Event
{
    public readonly Command Command;
    public readonly Identity SagaIdentity;
    public readonly Type SagaType;

    public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
    {
        Command = command;
        SagaType = sagaType;
        SagaIdentity = sagaIdentity;
    }
}

Saga запрашивает обратный вызов в будущем (событие AlarmRequestedBySaga)

Запрос обратного вызова по тревоге переносится на событие и запускает событие и запускает сагу в запрошенное время или после него:

public class AlarmRequestedBySaga : Event
{
    public readonly Event Event;
    public readonly DateTime FireOn;
    public readonly Identity Identity;
    public readonly Type SagaType;

    public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
    {
        Identity = identity;
        SagaType = sagaType;
        Event = @event;
        FireOn = fireOn;
    }
}

В качестве альтернативы я могу хранить и команды, и события в одном потоке сообщения базового типа

public abstract class EventSourcedSaga
{
    protected EventSourcedSaga() { }

    protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
    {
        Identity = id;

        if (messages == null) throw new ArgumentNullException(nameof(messages));

        var count = 0;

        foreach (var message in messages)
        {
            var ev = message as Event;
            var command = message as Command;

            if(ev != null) Transition(ev);
            else if(command != null) _messages.Add(command);
            else throw new Exception($"Unsupported message type {message.GetType()}");

            count++;
        }

        if (count == 0)
            throw new ArgumentException("No messages provided");

        // All we need to know is the original number of events this
        // entity has had applied at time of construction.
        _unmutatedVersion = count;
        _constructing = false;
    }

    readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
    readonly List<Message> _messages = new List<Message>();
    readonly int _unmutatedVersion;
    private readonly bool _constructing = true;
    public readonly Identity Identity;

    public IList<Message> GetMessages()
    {
        return _messages.ToArray();
    }

    public void Transition(Event e)
    {
        _messages.Add(e);
        _dispatcher.Dispatch(this, e);
    }

    protected void SendCommand(Command c)
    {
        // Don't add a command whilst we are in the constructor. Message
        // state transition during construction must not generate new
        // commands, as those command will already be in the message list.
        if (_constructing) return;

        _messages.Add(c);
    }

    public int UnmutatedVersion() => _unmutatedVersion;
}

person morleyc    schedule 30.10.2015    source источник
comment
На мой взгляд, саги могут выдавать команды, обычно основанные на событиях, происходящих в нескольких совокупных корнях. Проблема, которую я вижу при записи команд, - это логика, необходимая для воспроизведения этих команд. Вам понадобится какая-то идемпотентность команды, как узнать, что команду нужно воспроизвести? Воспроизведение команды позже, после того, как произошли другие события, может изменить результаты. Если команда запускает взаимодействие со сторонними системами, каковы последствия ее повторного воспроизведения?   -  person Matt    schedule 03.11.2015
comment
@Matt правильно, GetEventStore будет публиковать все события по одному, мы можем запросить ответ с определенной уникальной позиции. Чтобы предотвратить повторное воспроизведение команд, мы бы, как EventStore, воспроизводили бы последнее известное событие вперед (поэтому мы сохраняем последнее отправленное событие / сообщение / команду после отправки)   -  person morleyc    schedule 03.11.2015
comment
Проблема в том, что все команды и события чередуются в потоке, и как узнать, что нужно пропустить команду? События, которые генерируются в результате выполнения команды, которую вы просматриваете в потоке, находятся где-то впереди в потоке (или нет, если команда / процесс не удалось выполнить).   -  person Matt    schedule 05.11.2015


Ответы (1)


Я считаю, что первые два вопроса являются результатом неправильного понимания менеджеров процессов (также известных как «Саги», см. Примечание о терминологии внизу).

Измените свое мышление

Похоже, вы пытаетесь смоделировать его (как я когда-то делал) как обратный агрегат. Проблема с этим: «социальный контракт» агрегата состоит в том, что его входные данные (команды) могут изменяться со временем (потому что системы должны иметь возможность изменяться с течением времени), а его выходы (события) - нет. После записи события становятся историей, и система всегда должна иметь возможность их обрабатывать. При наличии этого условия агрегат может быть надежно загружен из неизменяемого потока событий.

Если вы попытаетесь просто поменять местами входы и выходы в качестве реализации диспетчера процессов, его выход не может быть предметом записи, потому что команды могут быть устаревшими и со временем удалены из системы. При попытке загрузить поток с удаленной командой произойдет сбой. Следовательно, диспетчер процессов, смоделированный как обратный агрегат, не может быть надежно перезагружен из неизменяемого потока сообщений. (Я уверен, что вы могли бы придумать способ ... но мудро ли это?)

Итак, давайте подумаем о реализации диспетчера процессов и посмотрим, что он заменяет. Возьмем, к примеру, сотрудника, который управляет таким процессом, как выполнение заказов. Первое, что вы делаете для этого пользователя, - это настраиваете представление в пользовательском интерфейсе, чтобы он мог смотреть на него. Во-вторых, вы создаете кнопки в пользовательском интерфейсе, чтобы пользователь мог выполнять действия в ответ на то, что они видят в представлении. Бывший. "В этой строке есть PaymentFailed, поэтому я нажимаю CancelOrder. В этой строке есть PaymentSucceeded и OrderItemOutOfStock, поэтому я нажимаю ChangeToBackOrder. Этому порядку Pending и 1 день, поэтому я нажимаю _7 _" ... и так далее. Как только процесс принятия решения четко определен и начинает требовать слишком много времени пользователя, перед вами стоит задача автоматизировать этот процесс. Чтобы автоматизировать это, все остальное может остаться прежним (представление, даже часть пользовательского интерфейса, чтобы вы могли его проверить), но пользователь изменился, чтобы быть частью кода.

«Уходите, или я заменю вас очень маленьким сценарием оболочки».

Код диспетчера процессов теперь периодически считывает представление и может выдавать команды, если присутствуют определенные условия данных. По сути, простейшая версия диспетчера процессов - это некоторый код, который запускается по таймеру (например, каждый час) и зависит от конкретного представления (представлений). Это место, с которого я бы начал ... с того, что у вас уже есть (представления / средства обновления представления) и минимальных дополнений (код, который запускается периодически). Даже если позже вы решите, что вам нужны другие возможности для определенных случаев использования, «Будущее вы» будет лучше понимать конкретные недостатки, которые необходимо устранить.

И это отличное место, чтобы напомнить вам о законе Галла и наверное тоже ЯГНИ.

  1. Есть ли в сети другие справочные материалы по реализации саг из событийного источника? С чем я могу проверить свои идеи?

Трудно найти хороший материал, поскольку эти концепции имеют очень гибкую реализацию, и есть различные примеры, многие из которых чрезмерно спроектированы для общих целей. Однако вот несколько ссылок, которые я использовал в ответе.

DDD - развивающиеся бизнес-процессы
DDD / CQRS Google Group (много материалов для чтения)


Обратите внимание, что термин "сага" имеет иное значение, чем "диспетчер процессов". Обычная реализация саги - это, по сути, маршрутная квитанция, в которой каждый шаг и соответствующая компенсация отказов включены в квитанцию. Это зависит от каждого получателя промахов маршрутизации, выполняющего то, что указано в промахах маршрутизации, и успешной передачи их на следующий переход или выполнения компенсации сбоев и маршрутизации в обратном направлении. Это может быть слишком оптимистичным при работе с несколькими системами, управляемыми разными группами, поэтому вместо них часто используются менеджеры процессов. См. это SO вопрос для получения дополнительной информации.

person Kasey Speakman    schedule 06.11.2015
comment
Мне потребовалось время, чтобы добраться до этого, а затем увидеть ваше мышление. И я согласен, вместо того, чтобы беспокоиться о командах, которые вошли, мы должны вместо этого беспокоиться о том, чтобы сохранить тот факт, что мы перешли в состояние в саге, и просто воспроизвести эти события перехода (в отличие от команд, которые их вызвали). - person morleyc; 09.03.2016