22

Dart基础——如何在Dart&Flutter中使用Stream

 3 years ago
source link: https://my.oschina.net/u/4038200/blog/4864695
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1 什么是Stream?

Stream是Dart用来处理异步的API,和同样用来处理异步的Future不同的是,Stream可以异步的返回多个结果,而Future只能返回一个,如果你对Future有疑问,可以参考作者的上一篇文章,Dart基础——Dart异步Future与事件循环Event Loop

2 如何创建Stream?

1.1使用Stream的构造方法

Stream periodicStream = Stream.periodic(Duration(seconds: 2), (num) {
  return num;
});

periodic构造方法主要有两个参数,第一个参数类型为Duration(时间间隔),第二个参数类型为Function,Function每隔一个Duration(时间间隔)会被调用一次,参数num为事件调用的次数,从0开始依次递增。

翻阅源码 Stream.periodic是使用Timer.periodic加_SyncStreamController实现的

1.2将方法的返回值声明为Stream

Stream<String> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {
   //延迟interval(时间间隔)执行一次
    await Future.delayed(interval);
    //返回i  i++
    yield "stream返回${i++}";
    if (i == maxCount) break;
  }
}

看到这里你可能会有一些疑问什么是async*和yield?

yield为一个用async *修饰返回值为Stream的函数返回一个值,它就像return,不过他不会结束函数

Stream asynchronousNaturalsTo(n) async* {
  int k = 0;
  while (k < n) yield k++;
}

这里涉及到了Dart的生成器函数概念,在这里你只需要简单理解yield的作用就可以了

1.3使用StreamController

  var _controller = StreamController<int>();

  var _count = 1;

  createStream() {
  //函数每隔一秒调用一次
    Timer.periodic(Duration(seconds: 1), (t) {
      _controller.sink.add(_count);
      _count++;
    });
  }

我们主要使用_controller的两个属性,使用_controller.Stream获取流,使用_controller.sink.add向流中添加数据,上面的例子使用定时器,每隔一秒向流中添加数据_count

3 Stream的常用方法

接下来介绍一下Stream的常用方法

PS:以下Stream常用方法的展示都是用下面代码创建的流

Stream periodicStream = Stream.periodic(Duration(seconds: 1), (num) {
  return num;
});

3.1 listen

listen作为使用Stream最重要的方法,主要用于监听流的数据变化,每当流的数据变化时,listen中的方法都会被调用。

    periodicStream.listen((event) {
      print(event);
    });

listen方法默认参数为Function,参数中的event为上面示例中返回的num,每当流返回新的数据时,listen方法都会被调用。

控制台输出如下

0
1
2
3
4

打印流返回的数据

listen的onError参数当流出现错误时调用。

listen的onDone参数当流关闭时调用。

还有一个cancelOnError属性,默认情况下为true,可以将其设置为false以使订阅在发生错误后也能继续进行。

3.2 map

Stream.periodic(Duration(seconds: 1), (num) {
    return num;
  }).map((num) => num * 2)

使用map将流返回的数据进行转换

控制台输出如下

0
2
4
6

3.3 asBroadcastStream()&broadcast

通过Stream的asBroadcastStream()或StreamController的broadcast将单订阅的流转换为多订阅流

什么是单订阅流和多订阅流?

3.3.1 单订阅流

单订阅流顾名思义,此流只能有一个订阅者,也就是单订阅流的listen方法只能被调用一次,当第二次调用单订阅流的listen时会报错,值得一提的是,当我们创建流时,默认创建的就是单订阅流。

3.3.2 多订阅流

顾名思义,此流可以有多个订阅者,也就是多订阅流的listen方法可以被多次调用,通过Stream的asBroadcastStream()或StreamController的broadcast将单订阅流转换为多订阅流。

创建多订阅流
Stream broadcastStream = Stream.periodic(Duration(seconds: 5), (num) {
  return num;
}).asBroadcastStream();
var _controller = StreamController<int>.broadcast()

3.3.3 单订阅流与多订阅流的区别

第一个区别

第一个区别就是上面提到的订阅者数量的区别

第二个区别

我们重点要谈论一下两种流的第二个区别

第二个区别就是单订阅流会持有自己的数据,当订阅者出现时将自身持有的数据全部返回给订阅者,而多订阅流不会持有任何数据,如果多订阅流没有订阅者,多订阅流会把数据丢弃。

下面我们用两端代码来展示两种流处理数据上的差别

单订阅流代码展示

  var _controller = StreamController<int>.broadcast();
  var _count = 1;

  createStream() {
    Timer.periodic(Duration(seconds: 1), (t) {
      _controller.sink.add(_count);
      _count++;
    });
  }
createStream();
Future.delayed(Duration(seconds: 5), () {
  _controller.stream.listen((event) {
    print("单订阅流$event");
    });
});

控制台输出如下

可以看到,单订阅流即使前五秒我们没有订阅,但单订阅流还是在持有数据,当订阅者出现时将持有的所有数据发送给订阅者。

多订阅流代码展示

  var _controller = StreamController<int>.broadcast();
  var _count = 1;

  createStream() {
    Timer.periodic(Duration(seconds: 1), (t) {
      _controller.sink.add(_count);
      _count++;
    });
  }
    createStream();
    Future.delayed(Duration(seconds: 5), () {
      _controller.stream.listen((event) {
        print("多订阅流$event");
      });
    });
    Future.delayed(Duration(seconds: 10), () {
      _controller.stream.listen((event) {
        print("多订阅流二$event");
      });
    });

控制台输出

可以看到多订阅流产生的前五条数据都被丢弃了,只有订阅者出现后生成的数据被发送给了订阅者。

代码看完想必你已经理解了单订阅流与多订阅流的第二种区别,我制作了两种流程图帮助你理解

3.4 其他方法

处理 Stream 的方法

下面这些 Stream 类中的方法可以对 Stream 进行处理并返回结果:

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

4 管理流订阅

我们可以使用StreamSubscription对象来对流的订阅进行管理,listen方法的返回值就是StreamSubscription对象

  StreamSubscription subscription =
    Stream.periodic(Duration(seconds: 1), (num) {
    return num;
  }).listen((num) {
    print(num);
  });

4.1 暂停订阅

subscription.pause();

4.2 恢复订阅

subscription.resume();

4.3 取消订阅

subscription2.cancel();

当不需要监听流时记得调用这个方法,否则会造成内存泄漏

4.4 操作流订阅的例子

以下示例用来展示如何操作流订阅

  static var _controller = StreamController<int>();
  var _count = 1;
  createStream() {
    Timer.periodic(Duration(seconds: 1), (t) {
      _controller.sink.add(_count);
      _count++;
    });
  }

创建监听及监听管理对象

  StreamSubscription subscription2 = _controller.stream.listen((event) {
    print("单订阅流$event");
  });

操作流订阅的方法

createStream();
Future.delayed(Duration(seconds: 3), () {
  print("暂停");
  subscription2.pause();
});
Future.delayed(Duration(seconds: 5), () {
  print("继续");
  subscription2.resume();
});
Future.delayed(Duration(seconds: 7), () {
  print("取消");
  subscription2.cancel();
});

5 在Flutter中使用StreamBuilder组件

5.1 StreamBuilder组件介绍

StreamBuilder组件主要有两个参数

第一个参数stream,要订阅的流

第二个参数builder,widget构建函数

可以使用builder函数的snapshot.connectionState属性根据流的不同状态返回不同的组件

每当StreamBuilder监听的stream有数据变化时,builder函数就会被调用,组件重新构建。

5.2示例代码

import 'package:flutter/cupertino.dart';
import 'package:flutter/material.dart';
import 'package:flutter_demo/util/util.dart';

/// Copyright (C), 2020-2020, flutter_demo
/// FileName: streamBuilder_demo
/// Author: Jack
/// Date: 2020/12/27
/// Description:
class StreamBuilderDemo extends StatelessWidget {
  //创建流
  Stream<int> _stream() {
    Duration interval = Duration(seconds: 1);
    Stream<int> stream = Stream<int>.periodic(interval, (num) {
      return num;
    });
    stream = stream.take(59);
    return stream;
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('Stream Demo'),
      ),
      body: Center(
        child: StreamBuilder(
          stream: _stream(),
          builder: (BuildContext context, AsyncSnapshot<int> snapshot) {
            if (snapshot.connectionState == ConnectionState.done) {
              return Text(
                '1 Minute Completed',
                style: TextStyle(
                  fontSize: 30.0,
                ),
              );
            } else if (snapshot.connectionState == ConnectionState.waiting) {
              return Text(
                'Waiting For Stream',
                style: TextStyle(
                  fontSize: 30.0,
                ),
              );
            }
            return Text(
              '00:${snapshot.data.toString().padLeft(2, '0')}',
              style: TextStyle(
                fontSize: 30.0,
              ),
            );
          },
        ),
      ),
    );
  }
}

6 完整示例

上文所有的代码示例都在作者的GiuHub上,https://github.com/jack0-0wu/flutter_demo,里面还包含了一些常用flutter功能的展示。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK