Akka Streams с индексацией Alpakka в ES: имя индекса оценивается только при запуске выполнения

Я написал код с Akka Streams и Alpakka, который считывает данные из Amazon SQS и индексирует события в Elasticsearch. Все работает гладко и производительность потрясающая, но у меня проблема с именами индексов. У меня есть этот код:

class ElasticSearchIndexFlow(restClient: RestClient) {

  private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)

  def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
    ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
      restClient,
      DomainEventMarshaller.domainEventWrites
    )

  private def index = {
    val now = DateTime.now()
    s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
  }
}

Проблема в том, что после нескольких дней работы потока имя индекса не меняется. Я предполагаю, что Akka Streams создает под капотом слитый актор, и что функция index для получения имени индекса оценивается только в начале выполнения.

Любая идея о том, что я могу сделать для индексации событий в ES с именем индекса в соответствии с текущей датой?


person SergiGP    schedule 18.04.2018    source источник


Ответы (1)


Решение проблемы заключается в установке имени индекса на предыдущем шаге с помощью IncomingMessage.withIndexName

So:

def flow: Flow[(DomainEvent, Message), IncomingMessage[DomainEvent, Message], NotUsed] =
  Flow[(DomainEvent, Message)].map {
    case (domainEvent, message) =>
      IncomingMessage(Some(domainEvent.eventId), domainEvent, message)
        .withIndexName(indexName(domainEvent.ocurredOn))
}

А также:

def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
  ElasticsearchFlow.create[DomainEvent]("this-index-name-is-not-used", "domain-event", elasticSettings)(
    restClient,
    DomainEventMarshaller.domainEventWrites
  )
person SergiGP    schedule 19.04.2018