Creates a new stream with the events of a stream per original event.
This acts like expand, except that convert
returns a Stream
instead of an Iterable.
The events of the returned stream becomes the events of the returned
stream, in the order they are produced.
If convert
returns null
, no value is put on the output stream,
just as if it returned an empty stream.
The returned stream is a broadcast stream if this stream is.
Source
Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) { StreamController/*<E>*/ controller; StreamSubscription<T> subscription; void onListen() { assert(controller is _StreamController || controller is _BroadcastStreamController); final _EventSink/*<E>*/ eventSink = controller as Object /*=_EventSink<E>*/; subscription = this.listen( (T event) { Stream/*<E>*/ newStream; try { newStream = convert(event); } catch (e, s) { controller.addError(e, s); return; } if (newStream != null) { subscription.pause(); controller.addStream(newStream) .whenComplete(subscription.resume); } }, onError: eventSink._addError, // Avoid Zone error replacement. onDone: controller.close ); } if (this.isBroadcast) { controller = new StreamController/*<E>*/.broadcast( onListen: onListen, onCancel: () { subscription.cancel(); }, sync: true ); } else { controller = new StreamController/*<E>*/( onListen: onListen, onPause: () { subscription.pause(); }, onResume: () { subscription.resume(); }, onCancel: () => subscription.cancel(), sync: true ); } return controller.stream; }