Поиск событий и cqrs с помощью eventstore db

store db и event-sourcing, но у меня есть сомнения относительно прогнозов и cqrs. Пока что я вызываю свой коммандос и свой обработчик команд следующим образом:

создать-пользователя-команду

export class CreateUserCommand implements ICommand {
  constructor(
    public readonly userDto: UserStruct,
  ) {}
}

обработчик команд:

export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(private readonly publisher: EventPublisher) {}

  async execute(command: CreateUserCommand) {
    const { userDto } = command;
    const user = User.create(userDto);
        console.log(user.value)
    if (user.isLeft()) throw user.value;
    const userPublisher = this.publisher.mergeObjectContext(user.value);
        userPublisher.commit()
  }
}

событие:

export class UserCreatedEvent implements IEvent {
  static readonly NAME = "UniFtcIdade/user-registered";
  readonly $name = UserCreatedEvent.NAME;
  readonly $version = 0;
  constructor(
    public readonly aggregateId: string,
    public readonly state: { email: string; name: string },
    public readonly date: Date
  ) {
  }
}

домен:

export class User extends AggregateRoot {
  public readonly name: string;
  public readonly email: string;

  private constructor (guid: string, name: string, email: string) {
    super()
    this.apply(new UserCreatedEvent(guid, {email, name}, new Date()));
  }
  static create(
    dto: UserStruct
  ): Either<InvalidNameError | InvalidEmailError, User> {
    const name: Either<InvalidNameError, Name> = Name.create(dto.name);
    const email: Either<InvalidEmailError, Email> = Email.create(dto.email);
    if (name.isLeft()) return left(name.value);
    if (email.isLeft()) return left(email.value);
    const user = new User(v4(),name.value.value, email.value.value);
    return right(user);
  }
}

Но я сомневаюсь, как прогнозы войдут в эту ситуацию. Проекция используется для получения текущего состояния агрегата ??? У меня должен быть db как mongodb для сохранения текущего состояния, то есть каждый раз, когда я вызываю свой обработчик команд и меняю текущее состояние в mongodb ??? Предусмотрена ли для этого проекция событийb? сохранить текущее состояние агрегата ??


person Ming    schedule 13.03.2021    source источник


Ответы (1)


В CQRS при использовании EventStoreDb ваш агрегат должен быть спроектирован таким образом, чтобы его можно было восстановить в состояние из событий. События хранятся в потоке с уникальным именем и идентификатором (guid). При изменении агрегата вы должны прочитать этот поток и последовательно применить каждое событие для восстановления текущего состояния, прежде чем выполнять какие-либо изменения агрегата (который генерирует больше событий). Чтобы поддерживать целостность и обрабатывать оптимистичный параллелизм, у вас должна быть простая проверка версии в вашем агрегате, которая подсчитывает старые события + новые события, чтобы убедиться, что последний номер версии будет сохранен.

Проблемы, которые я вижу выше, заключаются в следующем. У вашего агрегата есть конструктор и статический метод, который генерирует события без какой-либо проверки текущего состояния, то есть: что произойдет, если я вызову create дважды с одним и тем же guid?

this.apply(new UserCreatedEvent(guid, {email, name}, new Date()));

Вы применяете государство здесь напрямую. Вместо этого вы должны вызвать событие внутри вашего метода Create.

this.raiseEvent(new UserCreatedEvent(guid, {email, name}, new Date()));

Это должно быть реализовано для следующих целей.

  • Добавлен в список незафиксированных событий
  • this.apply называется

Затем вы должны сохранить события в EventStoreDb в своем обработчике команд.

async execute(command: CreateUserCommand) {
    const { userDto } = command;
    const user = eventRepository.Get<User>(command.Id);
    user.Create(userDto); // Can now check current state and fail if required.
    eventRepository.Save(user)
  }

Репозиторий здесь простой. Он может создать пустого пользователя и применить все события по порядку перед возвратом пользователя. Сохранение должно просто прочитать список незафиксированных событий и сохранить их в пользовательском потоке.

Это командная сторона сделана. Для стороны чтения вы можете использовать готовую проекцию категории всех пользователей и записать их в mongo для чтения другим API (не вашим обработчиком команд).

person Paul Connolly    schedule 16.03.2021
comment
Спасибо, я заметил это из своего статического метода, не могли бы вы задать мне еще один вопрос? Я пытаюсь применить структуру микросервиса с этой структурой cqrs и источником событий, но у меня есть сомнения, например, моя команда создана, в асинхронном сценарии я должен опубликовать сообщение по моей теме и прослушать мою службу или контроллер, и когда я получу ответ, вызову ли я свой обработчик команд? Или мне все это делать в обработчике команд? Другими словами, мне нужен ответ от другого микросервиса, чтобы продолжить работу с обработчиком команд. - person Ming; 17.03.2021
comment
У вашего микросервиса должен быть API, который вы можете прочитать, прежде чем создавать свою команду и обслуживать ее на шине, вашему обработчику команд не нужно читать из другой микросервиса. Имейте в виду, что вы можете работать с устаревшими данными, но это совсем другая проблема. - person Paul Connolly; 22.03.2021