Я написал агрегат с источником событий, а теперь реализовал сагу с источником событий ... Я заметил, что они похожи друг на друга, и создал объект с источником событий в качестве базового класса, от которого оба являются производными.
Я видел здесь одну демонстрацию http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/, но чувствую, что может возникнуть проблема, поскольку команды могут быть потеряны в случае сбоя процесса, поскольку отправка команд находится вне транзакции записи?
public void Save(ISaga saga)
{
var events = saga.GetUncommittedEvents();
eventStore.Write(new UncommittedEventStream
{
Id = saga.Id,
Type = saga.GetType(),
Events = events,
ExpectedVersion = saga.Version - events.Count
});
foreach (var message in saga.GetUndispatchedMessages())
bus.Send(message); // can be done in different ways
saga.ClearUncommittedEvents();
saga.ClearUndispatchedMessages();
}
Вместо этого я использую EventStore Грега Янга, и когда я сохраняю EventSourcedObject (либо агрегат, либо сагу), последовательность действий выглядит следующим образом:
- Репозиторий получает список новых MutatingEvents.
- Записывает их в поток.
- EventStore запускает новые события, когда потоки записываются и фиксируются в потоке.
- Мы слушаем события из EventStore и обрабатываем их в EventHandlers.
Я реализую два аспекта саги:
- Принимать события, которые могут переходить в состояние, что, в свою очередь, может выдавать команды.
- Чтобы иметь будильник, когда в какой-то момент в будущем (через службу внешнего таймера) нам могут перезвонить).
Вопросы
Насколько я понимаю, обработчики событий не должны выдавать команды (что произойдет, если команда не удастся?), Но согласен ли я с вышеизложенным, поскольку сага - это то, что фактически контролирует создание команд (в ответ на события ) через этот прокси-сервер event, и любой сбой отправки команды может быть обработан извне (во внешнем обработчике событий, который имеет дело с
CommandEmittedFromSaga
и повторно отправляет команду в случае сбоя команды)?Или я забываю обертывать события и хранить собственные
Commands
иEvents
в одном потоке (смешанном с сообщением базового класса - сага будет потреблять и команды, и события, а агрегат будет потреблять только события)?Есть ли в сети другие справочные материалы по реализации саг из событийного источника? С чем я могу проверить свои идеи?
Ниже приведен некоторый фоновый код.
Saga выдает команду "Выполнить" (заключенная в событие CommandEmittedFromSaga)
Команда ниже заключена в событие:
public class CommandEmittedFromSaga : Event
{
public readonly Command Command;
public readonly Identity SagaIdentity;
public readonly Type SagaType;
public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
{
Command = command;
SagaType = sagaType;
SagaIdentity = sagaIdentity;
}
}
Saga запрашивает обратный вызов в будущем (событие AlarmRequestedBySaga)
Запрос обратного вызова по тревоге переносится на событие и запускает событие и запускает сагу в запрошенное время или после него:
public class AlarmRequestedBySaga : Event
{
public readonly Event Event;
public readonly DateTime FireOn;
public readonly Identity Identity;
public readonly Type SagaType;
public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
{
Identity = identity;
SagaType = sagaType;
Event = @event;
FireOn = fireOn;
}
}
В качестве альтернативы я могу хранить и команды, и события в одном потоке сообщения базового типа
public abstract class EventSourcedSaga
{
protected EventSourcedSaga() { }
protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
{
Identity = id;
if (messages == null) throw new ArgumentNullException(nameof(messages));
var count = 0;
foreach (var message in messages)
{
var ev = message as Event;
var command = message as Command;
if(ev != null) Transition(ev);
else if(command != null) _messages.Add(command);
else throw new Exception($"Unsupported message type {message.GetType()}");
count++;
}
if (count == 0)
throw new ArgumentException("No messages provided");
// All we need to know is the original number of events this
// entity has had applied at time of construction.
_unmutatedVersion = count;
_constructing = false;
}
readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
readonly List<Message> _messages = new List<Message>();
readonly int _unmutatedVersion;
private readonly bool _constructing = true;
public readonly Identity Identity;
public IList<Message> GetMessages()
{
return _messages.ToArray();
}
public void Transition(Event e)
{
_messages.Add(e);
_dispatcher.Dispatch(this, e);
}
protected void SendCommand(Command c)
{
// Don't add a command whilst we are in the constructor. Message
// state transition during construction must not generate new
// commands, as those command will already be in the message list.
if (_constructing) return;
_messages.Add(c);
}
public int UnmutatedVersion() => _unmutatedVersion;
}