asyncExpand<E> method
Transforms each element into a sequence of asynchronous events.
Returns a new stream and for each event of this stream, do the following:
- If the event is an error event or a done event, it is emitted directly by the returned stream.
- Otherwise it is an element. Then the
convert
function is called with the element as argument to produce a convert-stream for the element. - If that call throws, the error is emitted on the returned stream.
- If the call returns
null
, no further action is taken for the elements. - Otherwise, this stream is paused and convert-stream is listened to. Every data and error event of the convert-stream is emitted on the returned stream in the order it is produced. When the convert-stream ends, this stream is resumed.
The returned stream is a broadcast stream if this stream is.
Implementation
Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
StreamController<E> controller;
StreamSubscription<T> subscription;
void onListen() {
assert(controller is _StreamController ||
controller is _BroadcastStreamController);
final _EventSink<E> eventSink = controller as Object;
subscription = this.listen((T event) {
Stream<E> newStream;
try {
newStream = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newStream != null) {
subscription.pause();
controller.addStream(newStream).whenComplete(subscription.resume);
}
},
onError: eventSink._addError, // Avoid Zone error replacement.
onDone: controller.close);
}
if (this.isBroadcast) {
controller = new StreamController<E>.broadcast(
onListen: onListen,
onCancel: () {
subscription.cancel();
},
sync: true);
} else {
controller = new StreamController<E>(
onListen: onListen,
onPause: () {
subscription.pause();
},
onResume: () {
subscription.resume();
},
onCancel: () => subscription.cancel(),
sync: true);
}
return controller.stream;
}