Нехватка памяти при загрузке большого количества записей из базы данных

Я использую slick в Akka Streams для загрузки большого количества записей (~ 2M) из базы данных (postgresql) и записи их в файл S3. Тем не менее, я заметил, что мой код ниже работает для записей около ~ 50 тыс., Но не работает для чего-либо выше отметки 100 тыс.

  val allResults: Future[Seq[MyEntityImpl]] =
    MyRepository.getAllRecordss()

  val results: Future[MultipartUploadResult] = Source
    .fromFuture(allResults)
    .map(seek => seek.toList)
    .mapConcat(identity)
    .map(myEntity => myEntity.toPSV + "\n")
    .map(s => ByteString(s))
    .runWith(s3Sink)

Ниже приведен пример того, как выглядит myEntity:

case class MyEntityImpl(partOne: MyPartOne, partTwo: MyPartTwo) {
  def toPSV: String = myPartOne.toPSV + myPartTwo.toPSV
}
case class MyPartOne(field1: String, field2: String) {
  def toPSV: String = {s"$field1|"+s"$field2"}
}
case class MyPartOne(field1: String, field2: String) {
  def toPSV: String = {s"$field1|"+s"$field2"}
}

Я ищу способ сделать это более реактивным способом, чтобы он не исчерпал память.


person Anthony    schedule 19.02.2019    source источник


Ответы (1)


Основная проблема

Проблема в том, что вы извлекаете все записи из базы данных в локальную память перед их отправкой в ​​s3Sink.

Первое место, где данные загружаются в память, скорее всего, находится в вашем методе MyRepository.getAllRecords(). Большинство, если не все, реализации Seq основаны на оперативной памяти. Второе место, где вы определенно используете локальную память, находится в seek.toList, потому что List хранит все данные в памяти.

Решение

Вместо того, чтобы возвращать Seq из getAllRecords вы должны возвращать гладкое на основе акка Source напрямую. Это гарантирует, что вашему материализованному потоку потребуется память только для переходных шагов обработки перед переходом к s3.

Если ваше определение метода изменилось на:

def getAllRecords() : Source[MyEntityImpl, _]

Тогда остальная часть потока будет работать реактивным образом:

MyRepository
  .getAllRecords()
  .map(myEntity => myEntity.toPSV + "\n")
  .map(ByteString.apply)
  .runWith(s3Sink)
person Ramón J Romero y Vigil    schedule 19.02.2019