Creates a StreamTransformer.
The returned instance takes responsibility of implementing (bind).
When the user invokes bind
it returns a new "bound" stream. Only when
the user starts listening to the bound stream, the listen
method
invokes the given closure transformer
.
The transformer
closure receives the stream, that was bound, as argument
and returns a StreamSubscription. In almost all cases the closure
listens itself to the stream that is given as argument.
The result of invoking the transformer
closure is a StreamSubscription.
The bound stream-transformer (created by the bind
method above) then sets
the handlers it received as part of the listen
call.
Conceptually this can be summarized as follows:
-
var transformer = new StreamTransformer(transformerClosure);
creates aStreamTransformer
that supports thebind
method. -
var boundStream = stream.transform(transformer);
binds thestream
and returns a bound stream that has a pointer tostream
. -
boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)
starts the listening and transformation. This is accomplished in 2 steps: first theboundStream
invokes thetransformerClosure
with thestream
it captured:transformerClosure(stream, b)
. The resultsubscription
, a StreamSubscription, is then updated to receive its handlers:subscription.onData(f1)
,subscription.onError(f2)
,subscription(f3)
. Finally the subscription is returned as result of thelisten
call.
There are two common ways to create a StreamSubscription:
-
by creating a new class that implements StreamSubscription. Note that the subscription should run callbacks in the Zone the stream was listened to.
-
by allocating a StreamController and to return the result of listening to its stream.
Example use of a duplicating transformer:
stringStream.transform(new StreamTransformer<String, String>(
(Stream<String> input, bool cancelOnError) {
StreamController<String> controller;
StreamSubscription<String> subscription;
controller = new StreamController<String>(
onListen: () {
subscription = input.listen((data) {
// Duplicate the data.
controller.add(data);
controller.add(data);
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
},
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () => subscription.cancel(),
sync: true);
return controller.stream.listen(null);
});
Source
const factory StreamTransformer( StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) = _StreamSubscriptionTransformer<S, T>;