Alpakka MongoDB — указать тип в MongoSource

В настоящее время я играю с Akka Streams и разъемом Alpakka MongoDB. .

Можно ли указать тип для MongoSource?

val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
  private val todoCollection: MongoCollection[TodoMongo] = mongoDb
    .withCodecRegistry(codecRegistry)
    .getCollection("todo")

Я хотел бы сделать что-то вроде этого:

val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here

Но я получаю следующую ошибку:

Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].

Я не могу найти правильную документацию об этой части.


person xDs    schedule 10.03.2018    source источник
comment
MongoSource(todoCollection.find()) прямолинеен. что вы хотите сделать с промежуточным FindObservable ?   -  person Knows Not Much    schedule 10.03.2018
comment
Или вы можете взглянуть на reactivemongo.org/releases/0.1x/documentation /учебник/   -  person cchantep    schedule 10.03.2018


Ответы (1)


Это еще не опубликовано, но в главной ветке Alpakka, MongoSource.apply принимает параметр типа:

object MongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

Поэтому в предстоящем выпуске Alpakka 0.18 вы сможете делать следующее:

val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())

Обратите внимание, что source здесь предполагает, что todoCollection.find() возвращает Observable[TodoMongo]; настроить типы по мере необходимости.

А пока вы можете просто добавить приведенный выше код вручную. Например:

package akka.stream.alpakka.mongodb.scaladsl

import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable

object MyMongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

Обратите внимание, что MyMongoSource определяется как находящийся в пакете akka.stream.alpakka.mongodb.scaladsl (например, MongoSource), потому что ObservableToPublisher — это частный класс пакета. Вы должны использовать MyMongoSource так же, как и MongoSource:

val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find()) 
person Jeffrey Chung    schedule 10.03.2018
comment
Отличный ответ. Искал что-то подобное. Но как добавить тип вручную с помощью следующего метода применения MongoSource? def apply (запрос: Observable[Document]): Source[Document, NotUsed] - person xDs; 10.03.2018
comment
Отлично работает с Alpakka 0.18. - person xDs; 31.03.2018