В этой статье рассматривается одна из основ реактивного программирования: потоки, которые являются объектами типа Stream.

Если вы читали наши предыдущие статьи о фьючерсах, возможно, вы помните, что каждое фьючерс представляет собой одно значение (либо ошибку, либо данные), доставляемое асинхронно. Потоки работают аналогично, только вместо одного элемента поток может предоставить ноль или более значений и ошибокв течение времени.

Эта статья была впервые опубликована в Февраль 2020. Эта версия обновляет включенный код до нулевой безопасности.

Эта статья — третья, основанная на серии видео Flutter in Focus Асинхронное программирование в Dart. Первая статья, Изоляты и циклы событий, посвящена основам поддержки Dart для фоновой работы. Второй, Futures, обсуждает класс Future.

Если вы предпочитаете учиться, наблюдая или слушая, все, что описано в этой статье, описано в следующем видео.

Примечание. Код в этой статье был обновлен, чтобы отразить рекомендации и изменения в языке Dart (включая нулевую безопасность), произошедшие после выпуска видео 28 июня 2019 г.

Если вы думаете о том, как одиночное значение относится к итератору того же типа, вот как будущее относится к потоку: будущее представляет собой один запрос с одним ответом, а поток представляет собой один запрос с несколькими ответами. .

Как и в случае с фьючерсами, ключ заключается в том, чтобы заранее решить, что делать: 1) когда часть данных будет готова, 2) когда возникнет ошибка и 3) когда поток завершится. Как и в случае с фьючерсами, в этом процессе цикл событий Dart по-прежнему управляет шоу.

Защемление цикла событий

Например, если вы используете метод openRead() класса File для чтения данных из файла, этот метод возвращает поток.

Куски данных считываются с диска и поступают в цикл обработки событий. Библиотека Dart смотрит на них и говорит: «А, кто-то этого ждет», добавляет данные в поток и отправляет их вашему приложению.

Когда поступает другой фрагмент данных — он входит и выходит. Потоки на основе таймера и потоки данных из сетевого сокета также работают с циклом событий, используя часы и сетевые события.

Прослушивание потоков

Следующее, что нужно понять, это как работать с данными, предоставляемыми потоком.

Скажем, у вас есть класс, который дает вам поток, который выдает новое целое число раз в секунду (1, 2, 3, 4, 5…). Вы можете использовать метод listen() для подписки на поток. Единственным обязательным параметром является функция.

final myStream = NumberCreator().stream;
final subscription = myStream.listen(
    (data) => print(‘Data: $data’),
);

Каждый раз, когда поток выдает новое значение, функция вызывается и печатает значение:

Data: 1
Data: 2
Data: 3
Data: 4
...

Вот как работает listen().

Важно! По умолчанию потоки настроены на одну подписку. Они придерживаются своих ценностей до тех пор, пока кто-нибудь не подпишется, и они допускают только одного слушателя на всю свою жизнь. Если вы попытаетесь прослушать поток дважды, вы получите исключение.

К счастью, Dart также предлагает широковещательные потоки. Вы можете использовать метод asBroadcastStream(), чтобы сделать широковещательный поток из одной подписки. Широковещательные потоки работают так же, как потоки с одной подпиской, но у них может быть несколько слушателей.

Еще одно отличие широковещательных потоков:если никто не слушает, когда часть данных готова, эти данные отбрасываются.

final myStream = NumberCreator().stream;
final subscription = myStream.listen(
  (data) => print(‘Data: $data’),
);
final subscription2 = myStream.listen(
  (data) => print(‘Data again: $data’),
);

Давайте вернемся к тому первому listen()звонку, потому что есть еще пара вещей, о которых стоит поговорить.

Как упоминалось ранее, потоки могут вызывать ошибки точно так же, как и фьючерсы. Добавив функцию onError к вызову listen(), вы сможете перехватывать и обрабатывать любые ошибки.

Существует также свойство cancelOnError, для которого по умолчанию установлено значение true, но его можно установить в значение false, чтобы подписка продолжалась даже после ошибки.

Вы можете добавить функцию onDone для выполнения некоторого кода, когда поток завершит отправку данных, например, когда файл будет полностью прочитан.

С учетом всех четырех параметров — onError, onDone, cancelOnError и обязательного параметра (onData) — вы можете быть готовы к любым событиям заранее.

final myStream = NumberCreator().stream;
final subscription = myStream.listen(
  (data){
    print(‘Data: $data’);
},
onError: (err) {
  print(‘Error!’);
},
cancelOnError: false,
onDone: () {
  print(‘Done!’):
 },
);

Совет. Объект, который возвращает listen(), имеет несколько собственных полезных методов. Он называется StreamSubscription, и вы можете использовать его для приостановки, возобновления и даже отмены потока данных.

final subscription = myStream.listen(…);
subscription.pause();
subscription.resume();
subscription.cancel();

Использование потоков и управление ими

Теперь, когда вы знаете, как использовать listen() для подписки на поток и получения событий данных, давайте поговорим о том, что делает потоки действительно крутыми: об управлении ими.

Как только у вас есть данные в потоке, многие операции внезапно становятся плавными и элегантными.

Вернемся к этому числовому потоку ранее.

С помощью метода map() вы можете брать каждое значение из потока и на лету преобразовывать его во что-то другое. Дайте map() функцию для преобразования, и она вернет новый поток, тип которого соответствует возвращаемому значению функции.

Вместо потока целых чисел теперь есть поток строк. Бросьте listen()call в конце, передайте ему print()function, и теперь он печатает строки прямо из потока — асинхронно, по мере их поступления.

NumberCreator().stream
    .map((i) => ‘String $i’)
    .listen(print) ;
String 1
String 2
String 3
String 4
*/

Есть много методов, которые вы можете связать таким образом. Например, если вы хотите печатать только четные числа, вы можете использовать where() для фильтрации потока. Дайте ему тестовую функцию, которая возвращает логическое значение для каждого элемента, и она возвращает новый поток, который включает только те значения, которые проходят тест.

NumberCreator().stream
    .where((i) => i % 2 == 0)
    .map((i) => ‘String $i’)
    .listen(print) ;
String 2
String 4
String 6
String 8

Метод distinct() — еще один хороший. С приложением, которое использует хранилище Redux, это хранилище создает новые объекты состояния приложения в потоке onChange.

Вы можете использовать map() для преобразования потока объектов состояния в поток моделей представления для одной части приложения. Затем вы можете использовать метод distinct(), чтобы получить поток, который отфильтровывает последовательные идентичные значения (на случай, если хранилище выкинет изменение, которое не влияет на подмножество данных в модели представления).

Затем вы можете слушать и обновлять пользовательский интерфейс всякий раз, когда вы получаете новую модель представления.

myReduxStore.onChange
    .map((s) => MyViewModel(s))
    .distinct()
    .listen( /* update UI */ )

В Dart встроены дополнительные методы, которые можно использовать для формирования и изменения потоков. Кроме того, когда вы будете готовы к более продвинутым функциям, есть асинхронный пакет, поддерживаемый командой Dart и доступный на pub.dev. У него есть классы, которые могут объединять два потока вместе, кэшировать результаты и выполнять другие типы волшебства на основе потоков.

Чтобы узнать больше о потоковой магии, взгляните на пакет stream_transform.

Создание потоков

Наконец, еще одна сложная тема, заслуживающая упоминания здесь, — это создание собственных потоков.

Как и в случае с фьючерсами, большую часть времени вы будете работать с потоками, созданными для вас сетевыми библиотеками, файловыми библиотеками, системой управления состоянием и т. д., но вы можете создавать свои собственные с помощью файла StreamController.

Давайте вернемся к тому примеру NumberCreator, который мы использовали до сих пор. Вот фактический код для него:

class NumberCreator {
  NumberCreator() {
    Timer.periodic(const Duration(seconds: 1), (timer) {
      _controller.sink.add(_count);
     _count += 1;
   });
 }
 final _controller = StreamController<int>();
 var _count = 0;
 Stream<int> get stream => _controller.stream;
}

Как видите, он ведет текущий счетчик и использует таймер для увеличения этого счетчика каждую секунду. Однако интересным моментом является контроллер потока.

StreamController создает совершенно новый поток с нуля и дает вам доступ к обоим его концам. Есть сам конец потока, куда поступают данные. (Мы использовали его на протяжении всей статьи.)

Stream<int> get stream => _controller.stream;

И есть конец приемника, где новые данные добавляются в поток:

_controller.sink.add(_count);

NumberCreator использует их оба. Когда таймер срабатывает, он добавляет последний счетчик в приемник контроллера, а затем предоставляет поток контроллера с общедоступным свойством, чтобы другие объекты могли подписаться на него.

Создание виджетов Flutter с использованием потоков

Теперь, когда мы рассмотрели создание, управление и прослушивание потоков, давайте поговорим о том, как заставить их работать при создании виджетов во Flutter.

Если вы читали предыдущую статью о фьючерсах, возможно, вы помните FutureBuilder. Вы даете ему будущее и метод построения, и он создает виджеты на основе состояния будущего.

Для потоков есть аналогичный виджет под названием StreamBuilder. Дайте ему поток, подобный потоку от создателя чисел и метода построителя, и он перестраивает своих дочерних элементов всякий раз, когда поток выдает новое значение.

StreamBuilder<String>(
  stream: NumberCreator().stream.map((i) => ‘String $i’),
  builder: (context, snapshot) {
    // Build some widgets
    throw UnimplementedError(“Case not handled yet”);
  },
);

Параметр снимка — AsyncSnapshot, как и FutureBuilder.

StreamBuilder<String>(
  stream: NumberCreator().stream.map((i) => ‘String $i’),
  builder: (context, snapshot) {
    if (snapshot.connectionState == ConnectionState.waiting) {
      return const Text(‘No data yet.’);
  }
  throw UnimplementedError(“Case not handled yet”);
},
);

Вы можете проверить его connectionState property, чтобы увидеть, не отправил ли поток еще какие-либо данные или он полностью завершен.

StreamBuilder<String>(
   stream: NumberCreator().stream.map((i) => 'String $i'),
   builder: (context, snapshot) {
      if (snapshot.connectionState == ConnectionState.waiting) {      
        return const Text('No data yet.');
      } else if (snapshot.connectionState == ConnectionState.done){
        return const Text('Done!');
      }
      throw UnimplementedError("Case not handled yet");
    },
 );

Вы можете использовать свойство hasError для обработки значений данных и проверки того, является ли последнее значение ошибкой.

StreamBuilder<String>(
  stream: NumberCreator().stream.map((i) => ‘String $i’),
  builder: (context, snapshot) {
    if (snapshot.connectionState == ConnectionState.waiting) {
      return const Text(‘No data yet.’);
    } else if (snapshot.connectionState == ConnectionState.done) {
      return const Text(‘Done!’);
    } else if (snapshot.hasError) {
      return const Text(‘Error!’);
    } else {
      return Text(snapshot.data ?? ‘’);
    } 
  },
);

Главное — убедиться, что ваш билдер знает, как обрабатывать все возможные состояния потока. Как только вы это сделаете, он сможет реагировать на все, что делает поток. (Дополнительную информацию, в том числе об экземпляре DartPad, с которым вы можете играть, см. на странице StreamBuilder API.)

Краткое содержание

В этой статье рассказывается о том, что представляют собой потоки, как получать значения из потока, как манипулировать этими значениями и как StreamBuilder помогает использовать значения потоков в приложении Flutter.

Вы можете узнать больше о потоках из документации Dart и Flutter:

Следите за обновлениями, чтобы не пропустить новые статьи из этой серии. Далее мы поговорим о async и await. Это два ключевых слова, которые предлагает Dart, чтобы помочь вам сделать ваш асинхронный код кратким и легко читаемым.

А пока вы можете посмотреть следующую серию видеороликов на Асинхронном программировании в Dartна нашем канале YouTube или зайти на наши веб-сайты для получения дополнительной информации о Дротик и Флаттер.