const StreamTransformer(StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))

Creates a StreamTransformer.

The returned instance takes responsibility of implementing (bind). When the user invokes bind it returns a new "bound" stream. Only when the user starts listening to the bound stream, the listen method invokes the given closure transformer.

The transformer closure receives the stream, that was bound, as argument and returns a StreamSubscription. In almost all cases the closure listens itself to the stream that is given as argument.

The result of invoking the transformer closure is a StreamSubscription. The bound stream-transformer (created by the bind method above) then sets the handlers it received as part of the listen call.

Conceptually this can be summarized as follows:

  1. var transformer = new StreamTransformer(transformerClosure); creates a StreamTransformer that supports the bind method.

  2. var boundStream = stream.transform(transformer); binds the stream and returns a bound stream that has a pointer to stream.

  3. boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b) starts the listening and transformation. This is accomplished in 2 steps: first the boundStream invokes the transformerClosure with the stream it captured: transformerClosure(stream, b). The result subscription, a StreamSubscription, is then updated to receive its handlers: subscription.onData(f1), subscription.onError(f2), subscription(f3). Finally the subscription is returned as result of the listen call.

There are two common ways to create a StreamSubscription:

  1. by creating a new class that implements StreamSubscription. Note that the subscription should run callbacks in the Zone the stream was listened to.

  2. by allocating a StreamController and to return the result of listening to its stream.

Example use of a duplicating transformer:

stringStream.transform(new StreamTransformer<String, String>(
    (Stream<String> input, bool cancelOnError) {
      StreamController<String> controller;
      StreamSubscription<String> subscription;
      controller = new StreamController<String>(
        onListen: () {
          subscription = input.listen((data) {
              // Duplicate the data.
              controller.add(data);
              controller.add(data);
            },
            onError: controller.addError,
            onDone: controller.close,
            cancelOnError: cancelOnError);
        },
        onPause: () { subscription.pause(); },
        onResume: () { subscription.resume(); },
        onCancel: () => subscription.cancel(),
        sync: true);
      return controller.stream.listen(null);
    });

Source

const factory StreamTransformer(
    StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
    = _StreamSubscriptionTransformer<S, T>;