dynamic convert(T event)
) Creates a new stream with each data event of this stream asynchronously mapped to a new event.
This acts like map, except that convert
may return a Future,
and in that case, the stream waits for that future to complete before
continuing with its result.
The returned stream is a broadcast stream if this stream is.
Source
/**
* Creates a new stream with each data event of this stream asynchronously
* mapped to a new event.
*
* This acts like [map], except that [convert] may return a [Future],
* and in that case, the stream waits for that future to complete before
* continuing with its result.
*
* The returned stream is a broadcast stream if this stream is.
*/
Stream asyncMap(convert(T event)) {
StreamController controller;
StreamSubscription subscription;
void onListen () {
final add = controller.add;
assert(controller is _StreamController ||
controller is _BroadcastStreamController);
final eventSink = controller;
final addError = eventSink._addError;
subscription = this.listen(
(T event) {
var newValue;
try {
newValue = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newValue is Future) {
subscription.pause();
newValue.then(add, onError: addError)
.whenComplete(subscription.resume);
} else {
controller.add(newValue);
}
},
onError: addError,
onDone: controller.close
);
}
if (this.isBroadcast) {
controller = new StreamController.broadcast(
onListen: onListen,
onCancel: () { subscription.cancel(); },
sync: true
);
} else {
controller = new StreamController(
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
sync: true
);
}
return controller.stream;
}