В этой статье рассматривается одна из основ реактивного программирования: потоки, которые являются объектами типа Stream.

Если вы читали предыдущую статью о фьючерсах, возможно, вы помните, что каждое будущее представляет собой одно значение (либо ошибку, либо данные), которое оно передает асинхронно. Потоки работают аналогично, но вместо одного элемента поток может передавать ноль или более значений и ошибок за время.

Это третья статья, основанная на серии видео Flutter in Focus Асинхронное программирование в Dart. В первой статье Изоляты и циклы событий были рассмотрены основы поддержки фоновой работы Dart. Во втором, Futures, речь шла о классе Future.

Если вы предпочитаете учиться, наблюдая или слушая, все, что описано в этой статье, рассматривается в следующем видео.

Если вы думаете о том, как одно значение соотносится с итератором того же типа, то именно так будущее соотносится с потоком.

Как и в случае с фьючерсами, ключ заключается в том, чтобы заранее решить: «Вот что делать, когда часть данных готова, когда возникает ошибка и когда поток завершается».

Также, как и в случае с фьючерсами, цикл событий Dart все еще запускает шоу.

Если вы используете метод openRead() класса File для чтения данных из файла, например, этот метод возвращает поток.

Фрагменты данных считываются с диска и попадают в цикл обработки событий. Библиотека Dart смотрит на них и говорит: «А, меня это уже кое-кто ждет», добавляет данные в поток, и они появляются в коде вашего приложения.

Когда приходит другой фрагмент данных, он уходит и выходит. Потоки на основе таймера, потоковая передача данных из сетевого сокета - они также работают с циклом событий, используя часы и сетевые события.

Прослушивание потоков

Поговорим о том, как работать с данными, предоставляемыми потоком. Допустим, у вас есть класс, который дает вам поток, который выдает новое целое число раз в секунду: 1, 2, 3, 4, 5…

Вы можете использовать метод listen() для подписки на поток. Единственный обязательный параметр - это функция.

Каждый раз, когда поток передает новое значение, функция вызывается и выводит значение:

Data: 1
Data: 2
Data: 3
Data: 4
...

Вот как работает listen().

Важно: по умолчанию потоки настроены для единой подписки. Они придерживаются своих ценностей до тех пор, пока кто-нибудь не подпишется на них, и допускают только одного слушателя на всю жизнь. Если вы попытаетесь прослушать поток дважды, вы получите исключение.

К счастью, Dart также предлагает трансляции. Вы можете использовать метод asBroadcastStream() для создания широковещательного потока из одной подписки. Широковещательные потоки работают так же, как потоки с одной подпиской, но у них может быть несколько слушателей, и если никто не слушает, когда часть данных готова, эти данные выбрасываются.

Давайте вернемся к первому listen() звонку, потому что есть еще пара вещей, о которых стоит поговорить.

Как мы упоминали ранее, потоки могут вызывать ошибки точно так же, как и фьючерсы. Добавив функцию onError к вызову listen(), вы можете поймать и обработать любую ошибку.

Также есть свойство cancelOnError, которое по умолчанию имеет значение true, но может иметь значение false, чтобы подписка продолжалась даже после ошибки.

И вы можете добавить функцию onDone для выполнения некоторого кода, когда поток завершает отправку данных, например, когда файл был полностью прочитан.

Комбинируя все четыре этих параметра - onError, onDone, cancelOnError и обязательный параметр (onData), вы можете быть готовы заранее ко всему, что произойдет.

Совет. Небольшой объект подписки, который возвращает listen(), имеет несколько собственных полезных методов. Это StreamSubscription, и вы можете использовать его для приостановки, возобновления и даже отмены потока данных.

Использование и управление потоками

Теперь, когда вы знаете, как использовать listen() для подписки на поток и получения событий данных, мы можем поговорить о том, что делает потоки действительно крутыми: манипулировать ими. Как только у вас появятся данные в потоке, многие операции станут удобными и элегантными.

Возвращаясь к тому числовому потоку из предыдущего, мы можем использовать метод под названием map(), чтобы взять каждое значение из потока и на лету преобразовать его во что-то другое. Дайте map() функцию для выполнения преобразования, и она вернет новый поток, набранный в соответствии с возвращаемым значением функции. Вместо потока целых чисел теперь у вас есть поток строк. Вы можете бросить вызов listen() в конце, присвоить ему функцию print(), и теперь вы печатаете строки прямо из потока, асинхронно, по мере их поступления.

Есть масса методов, которые можно объединить в эту цепочку. Например, если вы хотите распечатать только четные числа, вы можете использовать where() для фильтрации потока. Дайте ему тестовую функцию, которая возвращает логическое значение для каждого элемента и возвращает новый поток, который включает только значения, прошедшие проверку.

Еще один хороший метод distinct(). Если у вас есть приложение, которое использует хранилище Redux, это хранилище генерирует новые объекты состояния приложения в потоке onChange. Вы можете использовать map() для преобразования этого потока объектов состояния в поток моделей представления для одной части приложения. Затем вы можете использовать метод distinct(), чтобы получить поток, который отфильтровывает последовательные идентичные значения (в случае, если магазин вносит изменение, не влияющее на подмножество данных в модели представления). Затем вы можете прослушивать и обновлять пользовательский интерфейс всякий раз, когда получаете новую модель представления.

В Dart встроено множество дополнительных методов, которые можно использовать для формирования и изменения потоков. Кроме того, когда вы будете готовы к более сложным вещам, есть асинхронный пакет, поддерживаемый командой Dart и доступный на pub.dev. В нем есть классы, которые могут объединять два потока вместе, кэшировать результаты и выполнять другие типы потокового волшебства.

Для еще большей магии потока взгляните на пакет stream_transform.

Создание потоков

Здесь стоит упомянуть одну сложную тему, а именно, как создавать собственные потоки. Как и в случае с Futures, большую часть времени вы будете работать с потоками, созданными для вас сетевыми библиотеками, библиотеками файлов, управлением состояниями и т. Д. Но вы также можете сделать свой собственный, используя StreamController.

Давайте вернемся к тому NumberCreator, которым мы пользовались до сих пор. Вот реальный код для этого:

Как видите, он ведет текущий счетчик и использует таймер для увеличения этого счетчика каждую секунду. Однако интересным моментом является контроллер потока.

StreamController создает новый поток с нуля и дает вам доступ к обоим его концам. Вот и сам конец потока, куда поступают данные. (Мы использовали его на протяжении всей статьи.)

Stream<int> get stream => _controller.stream;

Затем идет приемник, на котором в поток добавляются новые данные:

_controller.sink.add(_count);

NumberCreator здесь использует их обоих. Когда таймер срабатывает, он добавляет последний счетчик в приемник контроллера, а затем предоставляет поток контроллера с общедоступным свойством, чтобы другие объекты могли подписаться на него.

Создание виджетов Flutter с использованием потоков

Теперь, когда мы рассмотрели создание, управление и прослушивание потоков, давайте поговорим о том, как заставить их работать, создавая виджеты во Flutter.

Если вы видели предыдущий видеоролик о фьючерсах, возможно, вы вспомнили FutureBuilder. Вы даете ему future и метод построения, и он строит виджеты на основе состояния будущего.

Для потоков есть похожий виджет под названием StreamBuilder. Дайте ему поток и метод построения, и он будет перестраивать свои дочерние элементы всякий раз, когда потоком генерируется новое значение.

Параметр моментального снимка - AsyncSnapshot, как и FutureBuilder. Вы можете проверить его свойство connectionState, чтобы узнать, не отправил ли поток еще какие-либо данные или он полностью завершен. Вы можете использовать свойство hasError, чтобы узнать, является ли последнее значение ошибкой. И, конечно же, вы можете обрабатывать значения данных.

Главное - просто убедиться, что ваш конструктор умеет обрабатывать все возможные состояния потока. Как только вы это получите, он сможет реагировать на все, что делает поток.

Резюме

В этой статье говорилось о том, что представляют собой потоки, как вы получаете значения из потока, способы управления этими значениями и как StreamBuilder помогает вам использовать значения потока в приложении Flutter.

Вы можете узнать больше о потоках из документации Dart и Flutter:

Или перейдите к следующему видео из серии Асинхронное программирование в Dart. В нем говорится о async и await, двух ключевых словах, которые предлагает Dart, чтобы помочь вам сохранить ваш асинхронный код сжатым и легким для чтения.

Большое спасибо Эндрю Брогдону, создавшему видео, на котором основана эта статья.