S3/MinIO с Java/Scala: сохранение байтовых буферов фрагментов файлов в хранилище объектов

Итак, представьте, что у меня есть Scala Vert.x Web REST API, который получает загрузку файлов через составные HTTP-запросы. Однако он не получает входящие данные файла в виде одного файла InputStream. Вместо этого каждый файл принимается как серия байтовых буферов, передаваемых через несколько функций обратного вызова.

Обратные вызовы в основном выглядят так:

  // the callback that receives byte buffers (chunks) of the file being uploaded
  //  it is called multiple times until the full file has been received
  upload.handler { buffer =>
    // send chunk to backend
  }

  // the callback that gets called after the full file has been uploaded
  //  (i.e. after all chunks have been received)
  upload.endHandler { _ =>
    // do something after the file has been uploaded
  }

  // callback called if an exception is raised while receiving the file
  upload.exceptionHandler { e =>
    // do something to handle the exception
  }

Теперь я хотел бы использовать эти обратные вызовы, чтобы сохранить файл в корзине MinIO (MinIO, если вы не знакомы, в основном представляет собой самостоятельный S3, и его API почти такой же, как S3 Java API).

Поскольку у меня нет дескриптора файла, мне нужно использовать putObject(), чтобы поместить InputStream в MinIO.

Неэффективный обходной путь, который я сейчас использую с MinIO Java API, выглядит следующим образом:

// this is all inside the context of handling a HTTP request
val out = new PipedOutputStream()
val in = new PipedInputStream()
var size = 0
in.connect(out)

upload.handler { buffer =>
    s.write(buffer.getBytes)
    size += buffer.length()
}

upload.endHandler { _ =>
    minioClient.putObject(
        PutObjectArgs.builder()
            .bucket("my-bucket")
            .object("my-filename")
            .stream(in, size, 50000000)
            .build())
}

Очевидно, что это не оптимально. Поскольку здесь я использую простой поток java.io, весь файл загружается в память.

Я не хочу сохранять файл на диск на сервере, прежде чем помещать его в хранилище объектов. Я хотел бы поместить его прямо в свое хранилище объектов.

Как я могу добиться этого, используя S3 API и серию байтовых буферов, предоставленных мне через обратный вызов upload.handler?

ИЗМЕНИТЬ

Я должен добавить, что я использую MinIO, потому что я не могу использовать коммерческое облачное решение, такое как S3. Однако, как упоминалось на веб-сайте MinIO, я могу использовать Amazon S3 Java SDK, используя MinIO в качестве решения для хранения данных.

Я попытался выполнить это руководство на веб-сайте Amazon для загрузки объектов. до S3 кусками.

Это решение, которое я попытался, выглядит так:

      context.request.uploadHandler { upload =>
        println(s"Filename: ${upload.filename()}")

        val partETags = new util.ArrayList[PartETag]
        val initRequest = new InitiateMultipartUploadRequest("docs", "my-filekey")
        val initResponse = s3Client.initiateMultipartUpload(initRequest)

        upload.handler { buffer =>
          println("uploading part", buffer.length())
          try {
            val request = new UploadPartRequest()
              .withBucketName("docs")
              .withKey("my-filekey")
              .withPartSize(buffer.length())
              .withUploadId(initResponse.getUploadId)
              .withInputStream(new ByteArrayInputStream(buffer.getBytes()))

            val uploadResult = s3Client.uploadPart(request)
            partETags.add(uploadResult.getPartETag)
          } catch {
            case e: Exception => println("Exception raised: ", e)
          }
        }

        // this gets called for EACH uploaded file sequentially
        upload.endHandler { _ =>
          // upload successful
          println("done uploading")
          try {
            val compRequest = new CompleteMultipartUploadRequest("docs", "my-filekey", initResponse.getUploadId, partETags)
            s3Client.completeMultipartUpload(compRequest)
          } catch {
            case e: Exception => println("Exception raised: ", e)
          }
          context.response.setStatusCode(200).end("Uploaded")
        }
        upload.exceptionHandler { e =>
          // handle the exception
          println("exception thrown", e)
        }
      }
    }

Это работает для небольших файлов (мой тестовый маленький файл был 11 байт), но не для больших файлов.

В случае больших файлов процессы внутри upload.handler постепенно замедляются по мере того, как файл продолжает загружаться. Кроме того, upload.endHandler никогда не вызывается, и файл почему-то продолжает загружаться после того, как 100% файла были загружены.

Однако, как только я закомментирую часть s3Client.uploadPart(request) внутри upload.handler и части s3Client.completeMultipartUpload внутри upload.endHandler (фактически выбрасывая файл, а не сохраняя его в хранилище объектов), загрузка файла проходит как обычно и завершается корректно.


person foxtrotuniform6969    schedule 05.12.2020    source источник
comment
Я добавил попытку использовать Java API AWS S3 для помещения файлового объекта в MinIO.   -  person foxtrotuniform6969    schedule 05.12.2020
comment
zengularity.github.io/benji/s3/usage.html работает в потоковый способ с любым сервисом, совместимым с S3 (AWS, Ceph, Minio), с использованием Akka-Stream   -  person cchantep    schedule 06.12.2020
comment
ХОРОШО. Есть примеры использования без Akka?   -  person foxtrotuniform6969    schedule 06.12.2020
comment
Да, точно. Я не просил рекомендации библиотеки. Зачем мне использовать Akka для решения этой одной проблемы, если я ничего не получу от всего остального?   -  person foxtrotuniform6969    schedule 06.12.2020
comment
Akka, FS2... спрашивать, почему не использовать потоковую либу для стрима, мне кажется по меньшей мере странным...   -  person cchantep    schedule 06.12.2020
comment
Я уже использую Vert.X (как вы, возможно, заметили, основываясь на обратных вызовах), и я бы не стал использовать какую-либо большую дополнительную библиотеку, если она действительно не нужна. До сих пор я мог обрабатывать WebSockets, Auth и все остальное, что мне нужно. Я бы не хотел задействовать большую библиотеку для решения проблемы только с одной конечной точкой/маршрутом HTTP.   -  person foxtrotuniform6969    schedule 06.12.2020
comment
Считаете ли вы, что мне следует повторно опубликовать вопрос, уделив больше внимания Vert.X? Это поможет?   -  person foxtrotuniform6969    schedule 06.12.2020
comment
Как такая библиотека, как FS2, решила бы это по-другому?   -  person foxtrotuniform6969    schedule 06.12.2020


Ответы (1)


Я понял, что я делал неправильно (при использовании клиента S3). Я не накапливал байты внутри моего upload.handler. Мне нужно накапливать байты до тех пор, пока размер буфера не станет достаточно большим для загрузки части, а не загружать каждый раз, когда я получаю несколько байтов.

Поскольку ни клиент Amazon S3, ни клиент MinIO не делали того, что мне нужно, я решил изучить, как на самом деле был реализован putObject(), и сделать свой собственный. Это то, что я придумал.

Эта реализация специфична для Vert.X, однако ее можно легко обобщить для работы со встроенными java.io InputStreams через while цикл и с использованием пары Piped- потоков.

Эта реализация также специфична для MinIO, но ее можно легко адаптировать для использования клиента S3, поскольку по большей части эти два API одинаковы.

В этом примере Buffer в основном представляет собой контейнер вокруг ByteArray, и я не делаю здесь ничего особенного. Я заменил его массивом байтов, чтобы гарантировать, что он все еще будет работать, и это сработало.

package server

import com.google.common.collect.HashMultimap
import io.minio.MinioClient
import io.minio.messages.Part
import io.vertx.core.buffer.Buffer
import io.vertx.core.streams.ReadStream

import scala.collection.mutable.ListBuffer

class CustomMinioClient(client: MinioClient) extends MinioClient(client) {
  def putReadStream(bucket: String = "my-bucket",
                    objectName: String,
                    region: String = "us-east-1",
                    data: ReadStream[Buffer],
                    objectSize: Long,
                    contentType: String = "application/octet-stream"
                   ) = {
    val headers: HashMultimap[String, String] = HashMultimap.create()
    headers.put("Content-Type", contentType)
    var uploadId: String = null

    try {
      val parts = new ListBuffer[Part]()
      val createResponse = createMultipartUpload(bucket, region, objectName, headers, null)
      uploadId = createResponse.result.uploadId()

      var partNumber = 1
      var uploadedSize = 0

      // an array to use to accumulate bytes from the incoming stream until we have enough to make a `uploadPart` request
      var partBuffer = Buffer.buffer()

      // S3's minimum part size is 5mb, excepting the last part
      // you should probably implement your own logic for determining how big
      // to make each part based off the total object size to avoid unnecessary calls to S3 to upload small parts.
      val minPartSize = 5 * 1024 * 1024

      data.handler { buffer =>

        partBuffer.appendBuffer(buffer)

        val availableSize = objectSize - uploadedSize - partBuffer.length

        val isMinPartSize = partBuffer.length >= minPartSize
        val isLastPart = uploadedSize + partBuffer.length == objectSize

        if (isMinPartSize || isLastPart) {

          val partResponse = uploadPart(
            bucket,
            region,
            objectName,
            partBuffer.getBytes,
            partBuffer.length,
            uploadId,
            partNumber,
            null,
            null
          )

          parts.addOne(new Part(partNumber, partResponse.etag))
          uploadedSize += partBuffer.length
          partNumber += 1

          // empty the part buffer since we have already uploaded it
          partBuffer = Buffer.buffer()
        }
      }


      data.endHandler { _ =>
        completeMultipartUpload(bucket, region, objectName, uploadId, parts.toArray, null, null)
      }

      data.exceptionHandler { exception =>
        // should also probably abort the upload here
        println("Handler caught exception in custom putObject: " + exception)
      }
    } catch {
      // and abort it here as well...
      case e: Exception =>
        println("Exception thrown in custom `putObject`: " + e)
        abortMultipartUpload(
          bucket,
          region,
          objectName,
          uploadId,
          null,
          null
        )
    }
  }
}

Все это можно использовать довольно легко.

Сначала настройте клиент:

  private val _minioClient = MinioClient.builder()
    .endpoint("http://localhost:9000")
    .credentials("my-username", "my-password")
    .build()

  private val myClient = new CustomMinioClient(_minioClient)

Затем, когда вы получаете запрос на загрузку:

      context.request.uploadHandler { upload =>
        myClient.putReadStream(objectName = upload.filename(), data = upload, objectSize = myFileSize)
        context.response().setStatusCode(200).end("done")
      }

Единственная загвоздка в этой реализации заключается в том, что вам нужно заранее знать размеры файлов для запроса.

Однако это можно легко решить так, как это сделал я, особенно если вы используете веб-интерфейс.

  • Прежде чем пытаться загрузить файлы, отправьте запрос на сервер, содержащий сопоставление имени файла с размером файла.
  • Этот предварительный запрос должен генерировать уникальный идентификатор для загрузки.
  • Сервер может сохранить группу имя файла-›размер файла, используя идентификатор загрузки в качестве индекса. - Сервер отправляет идентификатор загрузки обратно клиенту.
  • Клиент отправляет составной запрос на загрузку, используя идентификатор загрузки.
  • Сервер извлекает список файлов и их размеров и использует его для вызова .putReadStream()
person foxtrotuniform6969    schedule 06.12.2020