StreamTransformer<S, T> constructor

const StreamTransformer<S, T>(
  1. StreamSubscription<T> onListen(
    1. Stream<S> stream,
    2. bool cancelOnError
    )
)

Creates a StreamTransformer based on the given onListen callback.

The returned stream transformer uses the provided onListen callback when a transformed stream is listened to. At that time, the callback receives the input stream (the one passed to bind) and a boolean flag cancelOnError to create a StreamSubscription.

If the transformed stream is a broadcast stream, so is the stream returned by the StreamTransformer.bind method by this transformer.

If the transformed stream is listened to multiple times, the onListen callback is called again for each new Stream.listen call. This happens whether the stream is a broadcast stream or not, but the call will usually fail for non-broadcast streams.

The onListen callback does not receive the handlers that were passed to Stream.listen. These are automatically set after the call to the onListen callback (using StreamSubscription.onData, StreamSubscription.onError and StreamSubscription.onDone).

Most commonly, an onListen callback will first call Stream.listen on the provided stream (with the corresponding cancelOnError flag), and then return a new StreamSubscription.

There are two common ways to create a StreamSubscription:

  1. by allocating a StreamController and to return the result of listening to its stream. It's important to forward pause, resume and cancel events (unless the transformer intentionally wants to change this behavior).
  2. by creating a new class that implements StreamSubscription. Note that the subscription should run callbacks in the Zone the stream was listened to (see Zone and Zone.bindCallback).

Example:

/// Starts listening to [input] and duplicates all non-error events.
StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) {
  // Create the result controller.
  // Using `sync` is correct here, since only async events are forwarded.
  var controller = StreamController<int>(sync: true);
  controller.onListen = () {
    var subscription = input.listen((data) {
      // Duplicate the data.
      controller.add(data);
      controller.add(data);
    },
        onError: controller.addError,
        onDone: controller.close,
        cancelOnError: cancelOnError);
    // Controller forwards pause, resume and cancel events.
    controller
      ..onPause = subscription.pause
      ..onResume = subscription.resume
      ..onCancel = subscription.cancel;
  };
  // Return a new [StreamSubscription] by listening to the controller's
  // stream.
  return controller.stream.listen(null);
}

// Instantiate a transformer:
var duplicator = const StreamTransformer<int, int>(_onListen);

// Use as follows:
intStream.transform(duplicator);

Implementation

const factory StreamTransformer(
  StreamSubscription<T> onListen(Stream<S> stream, bool cancelOnError),
) = _StreamSubscriptionTransformer<S, T>;