Внедрение потокового вещания для управления Persistence Query Akka 2.4

Я исследовал экспериментальный модуль Akka Persistence Query и очень заинтересован в реализации пользовательского журнала чтения для своего приложения. В документации описаны два основных вида запросов: те, которые возвращают текущее состояние журнала (например, CurrentPersistenceIdsQuery), и те, которые возвращают поток с возможностью подписки, который генерирует события, когда события фиксируются в журнале через сторону записи приложения (например, AllPersistenceIdsQuery)

Для моего надуманного приложения я использую Postgres и Slick 3.1.1, чтобы управлять этими запросами. Я могу успешно передавать результаты запроса к базе данных, выполнив что-то вроде:

override def allPersistenceIds = {
  val db = Database.forConfig("postgres")
  val metadata = TableQuery[Metadata]

  val query = for (m <- metadata) yield m.persistenceId
  Source.fromPublisher(db.stream(query.result))
}

Однако поток считается завершенным, как только завершается базовое действие Slick DB. Похоже, это не соответствует требованию постоянно открытого потока, способного генерировать новые события.

Мои вопросы:

  • Есть ли способ сделать это исключительно с помощью Akka Streams DSL? То есть могу ли я отправить вверх поток, который нельзя закрыть?
  • Я немного изучил, как работает журнал чтения LevelDB, и, похоже, он обрабатывает новые события, подписывая журнал чтения на журнал записи. Это кажется разумным, но я должен спросить - вообще есть ли рекомендуемый подход для выполнения этого требования?
  • Другой подход, о котором я думал, - это опрос (например, периодически мой журнал чтения запрашивает БД и проверяет наличие новых событий/идентификаторов). Может ли кто-нибудь с большим опытом, чем я, дать совет?

Спасибо!


person simonl    schedule 05.01.2016    source источник


Ответы (2)


Это не так тривиально, как эта одна строка кода, но вы уже на правильном пути.

Чтобы реализовать «бесконечный» поток, вам нужно будет запрашивать несколько раз, т. Е. Осуществлять опрос, если базовая база данных не допускает бесконечный запрос (что здесь не делает AFAICS).

Опрос должен отслеживать «смещение», поэтому, если вы запрашиваете какой-либо тег и выдаете другой опрос, вам нужно начать этот (второй сейчас) запрос с «последнего испущенного элемента», а не с начала снова из-за стола. Так что вам нужно где-то, скорее всего, Актер, который сохраняет это смещение.

Плагин Query Side LevelDB — не лучший образец для подражания для других реализаций, поскольку он предполагает многое о базовом журнале и о том, как он работает. Кроме того, LevelDB не предназначен для работы с Akka Persistence — это журнал, который мы поставляем, чтобы иметь постоянный журнал, с которым можно работать из коробки (без запуска Cassandra и т. д.).

Если вы ищете вдохновение, плагины MongoDB на самом деле должны быть довольно хорошим источником для этого, поскольку они имеют те же ограничения, что и хранилища SQL. Я не уверен, что какой-либо из журналов SQL в настоящее время реализует сторону запроса.

person Konrad 'ktoso' Malawski    schedule 07.01.2016

Можно использовать API репликации Postgres для получения "бесконечного" потока событий базы данных. Он поддерживается драйвером JDBC Postgres, начиная с версии 42.0.0, см. соответствующий запрос на включение. Однако это не настоящий поток, а буферизованный синхронный ридер из базы данных WAL.

PGReplicationStream stream =
    pgConnection
        .replicationStream()
        .logical()
        .withSlotName("test_decoding")
        .withSlotOption("include-xids", false)
        .withSlotOption("skip-empty-xacts", true)
        .start();
while (true) {
  ByteBuffer buffer = stream.read();
  //process logical changes
}

Было бы неплохо иметь адаптер Akka Streams (Source) в проекте alpakka для этого читателя.

person Iurii Ant    schedule 03.03.2017