Creates a new stream with each data event of this stream asynchronously mapped to a new event.
This acts like map, except that convert
may return a Future,
and in that case, the stream waits for that future to complete before
continuing with its result.
The returned stream is a broadcast stream if this stream is.
Source
Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) {
StreamController/*<E>*/ controller;
StreamSubscription/*<T>*/ subscription;
void onListen() {
final add = controller.add;
assert(controller is _StreamController ||
controller is _BroadcastStreamController);
final _EventSink/*<E>*/ eventSink =
controller as Object /*=_EventSink<E>*/;
final addError = eventSink._addError;
subscription = this.listen(
(T event) {
dynamic newValue;
try {
newValue = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newValue is Future) {
subscription.pause();
newValue.then(add, onError: addError)
.whenComplete(subscription.resume);
} else {
controller.add(newValue as Object/*=E*/);
}
},
onError: addError,
onDone: controller.close
);
}
if (this.isBroadcast) {
controller = new StreamController/*<E>*/.broadcast(
onListen: onListen,
onCancel: () { subscription.cancel(); },
sync: true
);
} else {
controller = new StreamController/*<E>*/(
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
sync: true
);
}
return controller.stream;
}