Dart: Stream の使い方

Dart: Stream の使い方

Flutter において Dart の Stream はなくてはならない存在。たとえ開発者が意図的に使っていなかったとしても、ステート管理ライブラリの屋台骨は Stream が支えています。

StreamController() #

StreamController() でコントローラを生成し、それに sink.add() することでストリームにイベントを流すことができます。コントローラを stream.listen() して、コールバック関数を定義することで、流れてきたイベントを捕捉することができます。stream.listen()onError()onDone() を定義することで、それぞれエラー時のイベント、ストリームが閉じられる時のイベントを捕捉することができます。

import 'dart:async';

main() {
  final controller = StreamController();

  controller.stream.listen(
    (value) {
      print(value);
    },
    onError: (error) {
      print(error);
    },
    onDone: () {
      print("StreamController was closed.");
    },
  );

  controller.sink.add("Hello, World!");
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
  controller.sink.addError(new Exception("Something went wrong."));
  controller.sink.add(4);
  controller.sink.add(5);
  controller.sink.add(6);
  controller.close();

  // controller.sink.add("Oh my gosh!");
  //
  // もしコントローラーがクローズされた後に上記を実行するとエラーが発生します。
  // Unhandled exception:
  // Bad state: Cannot add event after closing
}

出力結果

Hello, World!
1
2
3
Exception: Something went wrong.
4
5
6
Stream was closed.

StreamController.broadcast() #

Stream には “Single subscription stream” と “Broadcast stream” の2種類があります。上記で見たものは “Single subscription stream” です。

  • Single subscription stream:一度しか listen() できない。複数回 listen() しようとするとエラーとなる。
  • Broadcast stream:複数ヶ所でも listen() できる。

Broadcast stream を利用するには、次のように broadcast() コンストラクタで生成します。

import 'dart:async';

main() {
  final controller = StreamController.broadcast();

  controller.stream.listen((value) {
    print("1: " + value);
  });

  controller.stream.listen((value) {
    print("2: " + value);
  });

  controller.stream.listen((value) {
    print("3: " + value);
  });

  controller.sink.add("Alice");
  controller.sink.add("Bob");
  controller.sink.add("Charlie");

  controller.close();
}

出力結果

1: Alice
2: Alice
3: Alice
1: Bob
2: Bob
3: Bob
1: Charlie
2: Charlie
3: Charlie

ストリームを返す関数定義 #

通常の関数と比較して、ストリームを返す関数の特徴は次の3つです。

  • 戻り値の型が Stream<T> であること
  • async* を記述すること
  • 関数内で yield を用いること

関数を呼び出して使う側は、生成されたストリームに対して listen() することで捕捉できます。

import 'dart:async';

void main() {
  Stream<int> generateCountStream() async* {
    int i = 0;
    while (true) {
      await Future.delayed(Duration(seconds: 1));
      i++;
      yield i;
    }
  }

  final countStream = generateCountStream();

  countStream.listen((int value) {
    print(value);
  });
}

出力結果

0
1
2
3
4
5
6
7
8
9
10
...
..
.

ストリームを返す関数定義(asBroadcastStream()#

Broadcast stream を生成するには asBroadcastStream() を呼び出します。これによって、複数回 listen() することが可能となります。

import 'dart:async';

void main() {
  Stream<int> generateCountStream() async* {
    int i = 0;
    while (true) {
      await Future.delayed(Duration(seconds: 1));
      i++;
      yield i;
    }
  }

  final countStream = generateCountStream().asBroadcastStream();

  countStream.listen((int value) {
    print("A: " + value.toString());
  });

  countStream.listen((int value) {
    print("B: " + value.toString());
  });
}

出力結果

A: 0
B: 0
A: 1
B: 1
A: 2
B: 2
A: 3
B: 3
A: 4
B: 4
A: 5
B: 5
...
..
.

ストリームを読みつつ完了を待つ await for #

ストリームに対して await for を使うことで listen() 同様に値を捕捉することができます。ただし以下の違いがあります。

  • listen():ストリームの完了を待たずに次の行に処理を進める
  • await for:ストリームの完了を待ってから次の行に処理を進める

次のコードを実行した場合 print("Hi!"); の行に処理が到達することはありません。

import 'dart:async';

void main() async {
  Stream<int> generateCountStream() async* {
    int i = 0;
    while (true) {
      await Future.delayed(Duration(seconds: 1));
      i++;
      yield i;
    }
  }

  final countStream = generateCountStream();

  await for (final value in countStream) {
    print(value);
  }

  print("Hi!"); // 👈 ここに処理が到達することはない
}

上記の await forlisten() にした場合は、最初に print("Hi!"); が実行完了して “Hi!” が出力されます。その後に listen() 内の print(value); が実行され、1, 2, 3 ... と数が出力されていくことになります。

値を呼び出し元に返しつつ処理を継続する yield #

最後に、先ほど出てきた yield について説明します。これは async* と合わせて使用するものです。すなわちストリームの関数内でのみ用いることができます。

通常、関数は return すると処理を終えて破棄されますが、yield は呼び出し元に値を返しつつ、yield の次の行からそのまま処理を進めます。yieldC# では yield return と呼ばれているようです。

import 'dart:async';

void main() async {
  Stream<int> generateCountStream() async* {
    print("yield: 1");
    yield 1;
    print("yield: 2");
    yield 2;
    print("yield: 3");
    yield 3;
  }

  final countStream = generateCountStream();

  countStream.listen((value) {
    print("listen: ${value}");
  });
}

出力結果

yield: 1
listen: 1
yield: 2
listen: 2
yield: 3
listen: 3