Проблемы с загрузкой/загрузкой файлов в akka-http/akka-streams

Я пытаюсь использовать akka-streams и akka-http и библиотеку alpakka для загрузки/выгрузки файлов на Amazon S3. Я вижу две проблемы, которые могут быть связаны...

  • Я могу скачивать только очень маленькие файлы, самый большой 8kb.
  • Я не могу загружать большие файлы. Это не удается с сообщением

    Ошибка при обработке запроса: «Источник подпотока не материализовался в течение 5000 миллисекунд». Завершение с ответом 500 Internal Server Error. Чтобы изменить поведение обработки исключений по умолчанию, укажите пользовательский ExceptionHandler. akka.stream.impl.SubscriptionTimeoutException: источник подпотока не материализовался за 5000 миллисекунд

Вот мои маршруты

pathEnd {
           post {
             fileUpload("attachment") {
               case (metadata, byteSource) => {
                 val writeResult: Future[MultipartUploadResult] = byteSource.runWith(client.multipartUpload("bucketname", key))
                 onSuccess(writeResult) { result =>
                   complete(result.location.toString())
                 }
               }

             }
           }

         } ~

     path("key" / Segment) {
            (sourceSystem, sourceTable, sourceId) =>
              get {
                val result: Future[ByteString] = 
         client.download("bucketname", key).runWith(Sink.head)
                onSuccess(result) {
                  complete(_)
                }
              }
          }

Попытка загрузить файл, скажем, 100 КБ, в конечном итоге приведет к получению усеченной версии файла, обычно размером около 16-25 КБ. Любая помощь приветствуется.

Изменить: по поводу проблемы с загрузкой я принял предложение Стефано и получил

[error]  found   : akka.stream.scaladsl.Source[akka.util.ByteString,akka.NotUsed]
[error]  required: akka.http.scaladsl.marshalling.ToResponseMarshallable

Это заставило его работать

complete(HttpEntity(ContentTypes.`application/octet-stream`, client.download("bucketname", key).runWith(Sink.head)))

person zerayaqob    schedule 19.09.2017    source источник


Ответы (1)


1) По вопросу загрузки: по телефону

val result: Future[ByteString] = 
         client.download("bucketname", key).runWith(Sink.head)

вы передаете все данные из S3 в память, а затем передаете результат.

Akka-Http в качестве поддержки потоковой передачи, которая позволяет вам передавать байты прямо из источника, не буферизуя их все в памяти. Дополнительную информацию об этом можно найти в документы. На практике это означает, что директива complete может принимать директиву Source[ByteString, _], как в

...
get {
  complete(client.download("bucketname", key))
}

2) По вопросу загрузки: вы можете попробовать настроить Akka HTTP akka.http.server.parsing.max-content-length:

# Default maximum content length which should not be exceeded by incoming request entities.
# Can be changed at runtime (to a higher or lower value) via the `HttpEntity::withSizeLimit` method.
# Note that it is not necessarily a problem to set this to a high value as all stream operations
# are always properly backpressured.
# Nevertheless you might want to apply some limit in order to prevent a single client from consuming
# an excessive amount of server resources.
#
# Set to `infinite` to completely disable entity length checks. (Even then you can still apply one
# programmatically via `withSizeLimit`.)
max-content-length = 8m

Результирующий код для проверки этого будет выглядеть примерно так:

  withoutSizeLimit {
    fileUpload("attachment") {
      ...
    }
  }
person Stefano Bonetti    schedule 19.09.2017