Creates a new stream with the events of a stream per original event.
This acts like expand, except that convert
returns a Stream
instead of an Iterable.
The events of the returned stream becomes the events of the returned
stream, in the order they are produced.
If convert
returns null
, no value is put on the output stream,
just as if it returned an empty stream.
The returned stream is a broadcast stream if this stream is.
Source
Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) {
StreamController/*<E>*/ controller;
StreamSubscription<T> subscription;
void onListen() {
assert(controller is _StreamController ||
controller is _BroadcastStreamController);
final _EventSink/*<E>*/ eventSink =
controller as Object /*=_EventSink<E>*/;
subscription = this.listen(
(T event) {
Stream/*<E>*/ newStream;
try {
newStream = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newStream != null) {
subscription.pause();
controller.addStream(newStream)
.whenComplete(subscription.resume);
}
},
onError: eventSink._addError, // Avoid Zone error replacement.
onDone: controller.close
);
}
if (this.isBroadcast) {
controller = new StreamController/*<E>*/.broadcast(
onListen: onListen,
onCancel: () { subscription.cancel(); },
sync: true
);
} else {
controller = new StreamController/*<E>*/(
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
sync: true
);
}
return controller.stream;
}