Во многих случаях важен контекст выполнения кода. В серверных программах контекст может нести диагностическую информацию; в приложениях пользовательского интерфейса к виджетам можно прикоснуться только из определенного основного потока. Это может создать потенциальную проблему, когда ваш код станет больше, особенно когда вы разделяете производителей данных и потребителей данных. Kotlin Flows предназначены для обеспечения такой модульности, поэтому давайте посмотрим, как они ведут себя в отношении контекста выполнения.
Коллекционер
Например, приложение пользовательского интерфейса может запустить сопрограмму в основном потоке для сбора элементов из потока, который возвращается некоторой dataFlow()
функцией, и обновления отображения с ее значениями:
launch(Dispatchers.Main) { // launch in the main thread initDisplay() // prepare ui dataFlow().collect { // block of the collector begins updateDisplay(it) // update ui } }
Напомним ¹, что сбор потока запускает код эмиттера потока, который вызывает emit
для доставки значений в блок сборщика. Но что, если бы автору этого потока пришлось выполнить некоторые вычисления, потребляющие ресурсы процессора, и написать что-то вроде этого:
fun dataFlow(): Flow<Data> = flow { // create emitter withContext(Dispatchers.Default) { while (isActive) { emit(someDataComputation()) } } }
Эта реализация dataFlow()
функции вызывает someDataComputation()
в контексте диспетчера Default
, чтобы гарантировать, что он не блокирует важные потоки, такие как основной поток пользовательского интерфейса ².
Если такая реализация эмиттера потока разрешена, то updateDisplay
в коде сборщика попытается обновить пользовательский интерфейс из неправильного потока. Сама возможность реализации такого рода потока вынудила бы каждый сборщик потока написать какой-то шаблонный код, чтобы гарантировать, что выполнение его блока происходит в правильном контексте, или для установления некоторых разногласий в рамках всего проекта по поводу контекста, в котором элементы потока разрешены к выпуску. Эти соглашения трудно поддерживать, поскольку проект становится больше и учитываются сторонние библиотеки и операторы - некоторые из них могут вообще не документировать свой контекст эмиссии, но даже когда они это делают, это создает слишком большую когнитивную нагрузку на разработчиков, у которых есть всегда сверяться с документацией и не забывать явно указывать нужный контекст. Худшая часть этой истории заключается в том, что если вы забудете о контексте, вы можете получить код, который проходит все тесты, но иногда приводит к незаметным и трудно воспроизводимым ошибкам во время выполнения.
Вот почему такая реализация Kotlin Flow недопустима. Каждая реализация потока должна сохранять контекст коллектора. Невыполнение этого требования, как и использование withContext
, приводит к исключению времени выполнения, которое обычно проявляется во время первого тестового запуска. Это означает, что на самом деле функция flow { ... }
builder не передает значение непосредственно блоку сборщика на emit
, но содержит логику для проверки этого инварианта сохранения контекста.
С Kotlin Flows совершенно безопасно писать сборщик в том виде, в каком он был написан. Не стоит беспокоиться. Контекст, в котором вызывается collect
, будет сохранен для блока кода, переданного этому вызову.
Эмиттер
Но как правильно реализовать функцию dataFlow()
эмиттера? Для начала удалим withContext
и выбросим из контекста сборщика:
fun dataFlow(): Flow<Data> = flow { // create emitter while (isActive) { emit(someDataComputation()) } }
Однако someDataComputation
может заблокировать поток сборщика, заморозив пользовательский интерфейс. Есть два пути обойти это. Один из них - инкапсулировать соответствующий контекст в самом someDataComputation
:
fun someDataComputation(): Data = withContext(Dispatchers.Default) { // implementation here }
Это хорошо работает для изолированной функции, но неудобно, если весь код в flow { ... }
нуждается в каком-то конкретном контексте, а также неэффективно переключать контекст назад и вперед для каждого значения. Итак, есть другое решение, использующее оператор flowOn
в результирующем потоке:
fun dataFlow(): Flow<Data> = flow { // create emitter while (isActive) { emit(someDataComputation()) } }.flowOn(Dispatchers.Default) // ^ works on the flow before it
Функция flowOn
изменяет контекст для потока, к которому она применяется, обеспечивая при этом сохранение контекста для сборщика, который будет применяться после него. Реализация оператора flowOn
создает отдельную сопрограмму с указанным Dispatchers.Default
контекстом для сбора someDataComputation
потока при передаче в контексте исходного сборщика ³.
Операторы
То же правило сохранения контекста применяется к операторам в потоках. Рассмотрим следующий поток:
dataFlow() .map { opA(it) } // in contextA .flowOn(contextA) .map { opB(it) } // in collector's context
Здесь opB
вызывается в контексте сборщика, но на контекст opA
влияет оператор flowOn
.
В общем, правила для контекста выполнения с Kotlin Flows просты. Неблокирующий код, который не заботится о своем контексте выполнения, не требует особых мер предосторожности. Сборщики всегда могут быть уверены, что их контекст выполнения сохраняется. Для кода, которому требуется определенный контекст выполнения, существует оператор flowOn
, который можно поместить после соответствующего контекстно-зависимого кода, чтобы собрать поток над ним в указанном контексте.