Stream asyncExpand(Stream convert(T event))

Creates a new stream with the events of a stream per original event.

This acts like expand, except that convert returns a Stream instead of an Iterable. The events of the returned stream becomes the events of the returned stream, in the order they are produced.

If convert returns null, no value is put on the output stream, just as if it returned an empty stream.

The returned stream is a broadcast stream if this stream is.


Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) {
  StreamController/*<E>*/ controller;
  StreamSubscription<T> subscription;
  void onListen() {
    assert(controller is _StreamController ||
           controller is _BroadcastStreamController);
    final _EventSink/*<E>*/ eventSink =
        controller as Object /*=_EventSink<E>*/;
    subscription = this.listen(
        (T event) {
          Stream/*<E>*/ newStream;
          try {
            newStream = convert(event);
          } catch (e, s) {
            controller.addError(e, s);
          if (newStream != null) {
        onError: eventSink._addError,  // Avoid Zone error replacement.
        onDone: controller.close
  if (this.isBroadcast) {
    controller = new StreamController/*<E>*/.broadcast(
      onListen: onListen,
      onCancel: () { subscription.cancel(); },
      sync: true
  } else {
    controller = new StreamController/*<E>*/(
      onListen: onListen,
      onPause: () { subscription.pause(); },
      onResume: () { subscription.resume(); },
      onCancel: () => subscription.cancel(),
      sync: true