Как скопировать некоторые записи из таблицы A в таблицу B с помощью slick-streaming и akka-streaming

Есть две таблицы TableA и TableB.

Мне нужно скопировать некоторые записи из TableA в TableB. Я использую slick-3.0 и использую следующим образом:

import akka.stream._
import akka.stream.scaladsl._
...

//{{ READ DATA FROM TABLE A
val q = TableA.filter(somePredicate).result
val source = Source.fromPublisher {
      db.stream(q.result).mapResult { r => 
        val record: RecordA = someTransformation(r) 
        record
      }
    }.grouped(50) // grouping because I want to write records in batch mode
//}}

//{{ WRITE DATA TO TABLE B
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] =>
      //TODO how to write batch to TableB asynchronously?
      val insertAction = TableB ++= batch  // insert batch to table
      val fInsert: Future[_] = db.run(insertAction)
      Await.result(fInsert, ...)           // #1 this works only with blocking
})
//}}

Но я столкнулся с проблемой - как записать пакет в TableB асинхронно (см. TODO). Теперь приведенный выше код работает только с блокировкой во внутреннее будущее (см. комментарий №1). Есть ли правильный способ асинхронной реализации этой задачи?


person John Mullins    schedule 08.06.2017    source источник
comment
Что произойдет, если вы не заблокируете внутреннее будущее?   -  person thwiegan    schedule 08.06.2017
comment
@thwiegan, если я не заблокирую внутреннее будущее и не верну его, оно не будет завершено   -  person John Mullins    schedule 08.06.2017
comment
Кажется, это ваш вариант использования: stackoverflow.com/questions/36400152/ Я не вижу ничего слишком отличного от вашего примера   -  person thwiegan    schedule 08.06.2017
comment
@thwiegan, спасибо за ответ. Но, к сожалению, этот пример тоже не работает. Это работает, только если внутреннее будущее заблокировано. В противном случае данные не сохраняются в TableB.   -  person John Mullins    schedule 08.06.2017


Ответы (1)


используйте mapAsync, он ожидает, что будет возвращено будущее, и предоставляет «развернутый» результат на следующем этапе.

source.mapAsync(4){batch: Seq[RecordA] =>
      val insertAction = TableB ++= batch  // insert batch to table
      db.run(insertAction)
}).to(Sink.ignore).run
person raam86    schedule 08.06.2017