Flutter において Dart の Stream はなくてはならない存在。たとえ開発者が意図的に使っていなかったとしても、ステート管理ライブラリの屋台骨は Stream が支えています。
StreamController()
#
StreamController() でコントローラを生成し、それに sink.add() することでストリームにイベントを流すことができます。コントローラを stream.listen() して、コールバック関数を定義することで、流れてきたイベントを捕捉することができます。stream.listen() に onError()、onDone() を定義することで、それぞれエラー時のイベント、ストリームが閉じられる時のイベントを捕捉することができます。
import 'dart:async';
main() {
final controller = StreamController();
controller.stream.listen(
(value) {
print(value);
},
onError: (error) {
print(error);
},
onDone: () {
print("StreamController was closed.");
},
);
controller.sink.add("Hello, World!");
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.addError(new Exception("Something went wrong."));
controller.sink.add(4);
controller.sink.add(5);
controller.sink.add(6);
controller.close();
// controller.sink.add("Oh my gosh!");
//
// もしコントローラーがクローズされた後に上記を実行するとエラーが発生します。
// Unhandled exception:
// Bad state: Cannot add event after closing
}
出力結果
Hello, World!
1
2
3
Exception: Something went wrong.
4
5
6
Stream was closed.
StreamController.broadcast()
#
Stream には “Single subscription stream” と “Broadcast stream” の2種類があります。上記で見たものは “Single subscription stream” です。
- Single subscription stream:一度しか
listen()できない。複数回listen()しようとするとエラーとなる。 - Broadcast stream:複数ヶ所でも
listen()できる。
Broadcast stream を利用するには、次のように broadcast() コンストラクタで生成します。
import 'dart:async';
main() {
final controller = StreamController.broadcast();
controller.stream.listen((value) {
print("1: " + value);
});
controller.stream.listen((value) {
print("2: " + value);
});
controller.stream.listen((value) {
print("3: " + value);
});
controller.sink.add("Alice");
controller.sink.add("Bob");
controller.sink.add("Charlie");
controller.close();
}
出力結果
1: Alice
2: Alice
3: Alice
1: Bob
2: Bob
3: Bob
1: Charlie
2: Charlie
3: Charlie
ストリームを返す関数定義 #
通常の関数と比較して、ストリームを返す関数の特徴は次の3つです。
- 戻り値の型が
Stream<T>であること async*を記述すること- 関数内で
yieldを用いること
関数を呼び出して使う側は、生成されたストリームに対して listen() することで捕捉できます。
import 'dart:async';
void main() {
Stream<int> generateCountStream() async* {
int i = 0;
while (true) {
await Future.delayed(Duration(seconds: 1));
i++;
yield i;
}
}
final countStream = generateCountStream();
countStream.listen((int value) {
print(value);
});
}
出力結果
0
1
2
3
4
5
6
7
8
9
10
...
..
.
ストリームを返す関数定義(asBroadcastStream())
#
Broadcast stream を生成するには asBroadcastStream() を呼び出します。これによって、複数回 listen() することが可能となります。
import 'dart:async';
void main() {
Stream<int> generateCountStream() async* {
int i = 0;
while (true) {
await Future.delayed(Duration(seconds: 1));
i++;
yield i;
}
}
final countStream = generateCountStream().asBroadcastStream();
countStream.listen((int value) {
print("A: " + value.toString());
});
countStream.listen((int value) {
print("B: " + value.toString());
});
}
出力結果
A: 0
B: 0
A: 1
B: 1
A: 2
B: 2
A: 3
B: 3
A: 4
B: 4
A: 5
B: 5
...
..
.
ストリームを読みつつ完了を待つ await for
#
ストリームに対して await for を使うことで listen() 同様に値を捕捉することができます。ただし以下の違いがあります。
listen():ストリームの完了を待たずに次の行に処理を進めるawait for:ストリームの完了を待ってから次の行に処理を進める
次のコードを実行した場合 print("Hi!"); の行に処理が到達することはありません。
import 'dart:async';
void main() async {
Stream<int> generateCountStream() async* {
int i = 0;
while (true) {
await Future.delayed(Duration(seconds: 1));
i++;
yield i;
}
}
final countStream = generateCountStream();
await for (final value in countStream) {
print(value);
}
print("Hi!"); // 👈 ここに処理が到達することはない
}
上記の await for を listen() にした場合は、最初に print("Hi!"); が実行完了して “Hi!” が出力されます。その後に listen() 内の print(value); が実行され、1, 2, 3 ... と数が出力されていくことになります。
値を呼び出し元に返しつつ処理を継続する yield
#
最後に、先ほど出てきた yield について説明します。これは async* と合わせて使用するものです。すなわちストリームの関数内でのみ用いることができます。
通常、関数は return すると処理を終えて破棄されますが、yield は呼び出し元に値を返しつつ、yield の次の行からそのまま処理を進めます。yield は C# では yield return と呼ばれているようです。
import 'dart:async';
void main() async {
Stream<int> generateCountStream() async* {
print("yield: 1");
yield 1;
print("yield: 2");
yield 2;
print("yield: 3");
yield 3;
}
final countStream = generateCountStream();
countStream.listen((value) {
print("listen: ${value}");
});
}
出力結果
yield: 1
listen: 1
yield: 2
listen: 2
yield: 3
listen: 3