Я написал код с Akka Streams и Alpakka, который считывает данные из Amazon SQS и индексирует события в Elasticsearch. Все работает гладко и производительность потрясающая, но у меня проблема с именами индексов. У меня есть этот код:
class ElasticSearchIndexFlow(restClient: RestClient) {
private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)
def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
restClient,
DomainEventMarshaller.domainEventWrites
)
private def index = {
val now = DateTime.now()
s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
}
}
Проблема в том, что после нескольких дней работы потока имя индекса не меняется. Я предполагаю, что Akka Streams создает под капотом слитый актор, и что функция index
для получения имени индекса оценивается только в начале выполнения.
Любая идея о том, что я могу сделать для индексации событий в ES с именем индекса в соответствии с текущей датой?