Я исследовал экспериментальный модуль Akka Persistence Query и очень заинтересован в реализации пользовательского журнала чтения для своего приложения. В документации описаны два основных вида запросов: те, которые возвращают текущее состояние журнала (например, CurrentPersistenceIdsQuery
), и те, которые возвращают поток с возможностью подписки, который генерирует события, когда события фиксируются в журнале через сторону записи приложения (например, AllPersistenceIdsQuery
)
Для моего надуманного приложения я использую Postgres и Slick 3.1.1, чтобы управлять этими запросами. Я могу успешно передавать результаты запроса к базе данных, выполнив что-то вроде:
override def allPersistenceIds = {
val db = Database.forConfig("postgres")
val metadata = TableQuery[Metadata]
val query = for (m <- metadata) yield m.persistenceId
Source.fromPublisher(db.stream(query.result))
}
Однако поток считается завершенным, как только завершается базовое действие Slick DB. Похоже, это не соответствует требованию постоянно открытого потока, способного генерировать новые события.
Мои вопросы:
- Есть ли способ сделать это исключительно с помощью Akka Streams DSL? То есть могу ли я отправить вверх поток, который нельзя закрыть?
- Я немного изучил, как работает журнал чтения LevelDB, и, похоже, он обрабатывает новые события, подписывая журнал чтения на журнал записи. Это кажется разумным, но я должен спросить - вообще есть ли рекомендуемый подход для выполнения этого требования?
- Другой подход, о котором я думал, - это опрос (например, периодически мой журнал чтения запрашивает БД и проверяет наличие новых событий/идентификаторов). Может ли кто-нибудь с большим опытом, чем я, дать совет?
Спасибо!