Creates a new stream with the same events as this stream.
Whenever more than timeLimit
passes between two events from this stream,
the onTimeout
function is called.
The countdown doesn't start until the returned stream is listened to. The countdown is reset every time an event is forwarded from this stream, or when the stream is paused and resumed.
The onTimeout
function is called with one argument: an
EventSink that allows putting events into the returned stream.
This EventSink
is only valid during the call to onTimeout
.
If onTimeout
is omitted, a timeout will just put a TimeoutException
into the error channel of the returned stream.
The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will have its individually timer that starts counting on listen, and the subscriptions' timers can be paused individually.
Source
Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) {
StreamController<T> controller;
// The following variables are set on listen.
StreamSubscription<T> subscription;
Timer timer;
Zone zone;
_TimerCallback timeout;
void onData(T event) {
timer.cancel();
controller.add(event);
timer = zone.createTimer(timeLimit, timeout);
}
void onError(error, StackTrace stackTrace) {
timer.cancel();
assert(controller is _StreamController ||
controller is _BroadcastStreamController);
dynamic eventSink = controller;
eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
timer = zone.createTimer(timeLimit, timeout);
}
void onDone() {
timer.cancel();
controller.close();
}
void onListen() {
// This is the onListen callback for of controller.
// It runs in the same zone that the subscription was created in.
// Use that zone for creating timers and running the onTimeout
// callback.
zone = Zone.current;
if (onTimeout == null) {
timeout = () {
controller.addError(new TimeoutException("No stream event",
timeLimit), null);
};
} else {
// TODO(floitsch): the return type should be 'void', and the type
// should be inferred.
var registeredOnTimeout =
zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout);
_ControllerEventSinkWrapper wrapper =
new _ControllerEventSinkWrapper(null);
timeout = () {
wrapper._sink = controller; // Only valid during call.
zone.runUnaryGuarded(registeredOnTimeout, wrapper);
wrapper._sink = null;
};
}
subscription = this.listen(onData, onError: onError, onDone: onDone);
timer = zone.createTimer(timeLimit, timeout);
}
Future onCancel() {
timer.cancel();
Future result = subscription.cancel();
subscription = null;
return result;
}
controller = isBroadcast
? new _SyncBroadcastStreamController<T>(onListen, onCancel)
: new _SyncStreamController<T>(
onListen,
() {
// Don't null the timer, onCancel may call cancel again.
timer.cancel();
subscription.pause();
},
() {
subscription.resume();
timer = zone.createTimer(timeLimit, timeout);
},
onCancel);
return controller.stream;
}