Разделение потока RxJava/RxKotlin в зависимости от подтипа

У меня есть поток ResponseMessage, который может быть разных подтипов. Я хотел бы разделить поток на потоки, где я могу обрабатывать каждый тип в своем собственном потоке.

Моя первая попытка привела к этому, чего я не вижу.

file.readLines()
        .toObservable()
        .map { mapper.readValue(it, ResponseMessage::class.java) }
        .groupBy { when(it) {
            is MarketChangeMessage -> it::class
            else -> it::class
        }}
        .map { it.????? } //How can possible this work?

Теперь мой вопрос: каков идиоматический способ разделить поток на потоки по одному конкретному подтипу?


person user3139545    schedule 14.11.2017    source источник


Ответы (1)


Вы можете использовать оператор ofType:

ofType( ) — испускать только те элементы из исходного Observable, которые относятся к определенному классу.

Пример:

val messages = file.readLines()
    .toObservable()
    .map { mapper.readValue(it, ResponseMessage::class.java) }
    .share() // <-- or other multicasting operator

messages
    .ofType(MarketChangeMessage::class)
    .subscribe()

messages
    .ofType(Other::class)
    .subscribe()
person ESala    schedule 14.11.2017