Как с помощью Akka Streams узнать, что источник завершен?

У меня есть Alpakka Elasticsearch Sink, который я храню между запросами. Когда я получаю запрос, я создаю Source из HTTP-запроса и превращаю его в Source Elasticsearch WriteMessage, а затем запускаю его с mySource.runWith(theElasticseachSink).

  1. Как мне получить уведомление, когда источник будет завершен? Кажется, ничего полезного не материализовалось.
  2. Будет ли завершение источника передаваться в приемник, то есть мне придется каждый раз создавать новый?
  3. Если да, поможет ли как-то разъединить их с помощью Flow.fromSourceAndSink?

Моя цель - узнать, когда загрузка HTTP завершена (включая vias, через которые она проходит), и иметь возможность повторно использовать приемник.


person Isvara    schedule 02.02.2019    source источник


Ответы (2)


вы можете обойти отдельные части потока по своему желанию, вы даже можете обойти весь исполняемый граф (это неизменяемые). Вызов run() материализует поток, но не изменяет ваш график или его части.

1) Поскольку вы хотите знать, когда HttpDownload прошел поток, почему бы не использовать полные графики Future[Done]? Предполагая, что ваш вызов elasticsearch является асинхронным, это должно быть равно, поскольку ваш приемник просто запускает вызов и не ждет. Вы также можете использовать Source.queue (https://doc.akka.io/docs/akka/2.5/stream/operators/Source/queue.html) и просто добавьте свои сообщения в очередь, которая затем повторно использует определенный график, чтобы вы могли добавлять новые сообщения, когда обработка нужный. Это также материализует SourceQueueWithComplete, позволяя вам остановить поток. Кроме того, повторно используйте приемник везде, где это необходимо, не дожидаясь, пока его использует другой поток.

2) Как описано выше: нет, вам не нужно создавать экземпляр приемника несколько раз.

С уважением, Энди

person awagen    schedule 02.02.2019
comment
Как мне на самом деле получить Future[Done]? Источник Akka-http ResponseEntity dataBytes материализует Any (def dataBytes: Source[ByteString, Any]), и я понятия не имею, почему и что это должно быть. - person Isvara; 02.02.2019
comment
Ну, здесь я имел в виду материализованную стоимость раковины. Судя по тому, что вы описываете, вам это действительно нужно в Исходнике? Для приемника вы можете использовать Sink.foreach , который материализует Future[Done] . Пожалуйста, постарайтесь указать, почему именно в этом есть необходимость, на исходнике, если вам это нужно там. - person awagen; 02.02.2019
comment
Если вам это действительно нужно, вы можете добавить viaMat и написать собственную материализацию для вашего источника, которая обеспечивает статус, а затем передает его вниз по потоку через (Keep.right) или (Keep.left). Сегодня вечером у меня не будет времени написать пример, но если он вам нужен, пожалуйста, опубликуйте пример (упрощенного) кода. - person awagen; 02.02.2019
comment
Я разместил здесь небольшую копию: github.com/danellis/akka-es-test. Как видите, при запуске я получаю NotUsed, так что мне там не с чем работать. - person Isvara; 03.02.2019

Оказывается, библиотека Elasticsearch от Alpakka также поддерживает формы потока, так что я могу запустить свой источник через нее и запустить через любой приемник, который материализует будущее. Sink.foreach отлично работает здесь для целей тестирования, например, как в https://github.com/danellis/akka-es-test.

Flow fromFunction { product: Product =>
    WriteMessage.createUpsertMessage(product.id, product.attributes)
} via ElasticsearchFlow.create[Map[String, String]](index, "_doc")

чтобы определить es.flow, а затем

val graph = response.entity.withSizeLimit(MaxFeedSize).dataBytes
    .via(scanner)
    .via(CsvToMap.toMap(Utf8))
    .map(attrs => Product(attrs("id").decodeString(Utf8), attrs.mapValues(_.decodeString(Utf8))))
    .via(es.flow)

val futureDone = graph.runWith(Sink.foreach(println))

futureDone onComplete {
    case Success(_) => println("Done")
    case Failure(e) => println(e)
}
person Isvara    schedule 03.02.2019