Во многих случаях важен контекст выполнения кода. В серверных программах контекст может нести диагностическую информацию; в приложениях пользовательского интерфейса к виджетам можно прикоснуться только из определенного основного потока. Это может создать потенциальную проблему, когда ваш код станет больше, особенно когда вы разделяете производителей данных и потребителей данных. Kotlin Flows предназначены для обеспечения такой модульности, поэтому давайте посмотрим, как они ведут себя в отношении контекста выполнения.

Коллекционер

Например, приложение пользовательского интерфейса может запустить сопрограмму в основном потоке для сбора элементов из потока, который возвращается некоторой dataFlow() функцией, и обновления отображения с ее значениями:

launch(Dispatchers.Main) { // launch in the main thread
    initDisplay() // prepare ui
    dataFlow().collect { // block of the collector begins
        updateDisplay(it) // update ui
    } 
}

Напомним ¹, что сбор потока запускает код эмиттера потока, который вызывает emit для доставки значений в блок сборщика. Но что, если бы автору этого потока пришлось выполнить некоторые вычисления, потребляющие ресурсы процессора, и написать что-то вроде этого:

fun dataFlow(): Flow<Data> = flow { // create emitter
    withContext(Dispatchers.Default) {
        while (isActive) {
            emit(someDataComputation())
        }
    }
}

Эта реализация dataFlow() функции вызывает someDataComputation() в контексте диспетчера Default, чтобы гарантировать, что он не блокирует важные потоки, такие как основной поток пользовательского интерфейса ².

Если такая реализация эмиттера потока разрешена, то updateDisplay в коде сборщика попытается обновить пользовательский интерфейс из неправильного потока. Сама возможность реализации такого рода потока вынудила бы каждый сборщик потока написать какой-то шаблонный код, чтобы гарантировать, что выполнение его блока происходит в правильном контексте, или для установления некоторых разногласий в рамках всего проекта по поводу контекста, в котором элементы потока разрешены к выпуску. Эти соглашения трудно поддерживать, поскольку проект становится больше и учитываются сторонние библиотеки и операторы - некоторые из них могут вообще не документировать свой контекст эмиссии, но даже когда они это делают, это создает слишком большую когнитивную нагрузку на разработчиков, у которых есть всегда сверяться с документацией и не забывать явно указывать нужный контекст. Худшая часть этой истории заключается в том, что если вы забудете о контексте, вы можете получить код, который проходит все тесты, но иногда приводит к незаметным и трудно воспроизводимым ошибкам во время выполнения.

Вот почему такая реализация Kotlin Flow недопустима. Каждая реализация потока должна сохранять контекст коллектора. Невыполнение этого требования, как и использование withContext, приводит к исключению времени выполнения, которое обычно проявляется во время первого тестового запуска. Это означает, что на самом деле функция flow { ... } builder не передает значение непосредственно блоку сборщика на emit, но содержит логику для проверки этого инварианта сохранения контекста.

С Kotlin Flows совершенно безопасно писать сборщик в том виде, в каком он был написан. Не стоит беспокоиться. Контекст, в котором вызывается collect, будет сохранен для блока кода, переданного этому вызову.

Эмиттер

Но как правильно реализовать функцию dataFlow() эмиттера? Для начала удалим withContext и выбросим из контекста сборщика:

fun dataFlow(): Flow<Data> = flow { // create emitter
    while (isActive) {
        emit(someDataComputation())
    }
}

Однако someDataComputation может заблокировать поток сборщика, заморозив пользовательский интерфейс. Есть два пути обойти это. Один из них - инкапсулировать соответствующий контекст в самом someDataComputation:

fun someDataComputation(): Data = 
    withContext(Dispatchers.Default) { 
        // implementation here
    }

Это хорошо работает для изолированной функции, но неудобно, если весь код в flow { ... } нуждается в каком-то конкретном контексте, а также неэффективно переключать контекст назад и вперед для каждого значения. Итак, есть другое решение, использующее оператор flowOn в результирующем потоке:

fun dataFlow(): Flow<Data> = flow { // create emitter
    while (isActive) {
        emit(someDataComputation())
    }
}.flowOn(Dispatchers.Default) // ^ works on the flow before it

Функция flowOn изменяет контекст для потока, к которому она применяется, обеспечивая при этом сохранение контекста для сборщика, который будет применяться после него. Реализация оператора flowOn создает отдельную сопрограмму с указанным Dispatchers.Default контекстом для сбора someDataComputation потока при передаче в контексте исходного сборщика ³.

Операторы

То же правило сохранения контекста применяется к операторам в потоках. Рассмотрим следующий поток:

dataFlow()
    .map { opA(it) } // in contextA
    .flowOn(contextA) 
    .map { opB(it) } // in collector's context

Здесь opB вызывается в контексте сборщика, но на контекст opA влияет оператор flowOn.

В общем, правила для контекста выполнения с Kotlin Flows просты. Неблокирующий код, который не заботится о своем контексте выполнения, не требует особых мер предосторожности. Сборщики всегда могут быть уверены, что их контекст выполнения сохраняется. Для кода, которому требуется определенный контекст выполнения, существует оператор flowOn, который можно поместить после соответствующего контекстно-зависимого кода, чтобы собрать поток над ним в указанном контексте.

Дополнительная литература и примечания

  1. ^ Простой дизайн Kotlin Flow
  2. ^ Блокировка потоков, приостановка сопрограмм
  3. ^ Потоки Котлина и сопрограммы