timeout method Null safety
Creates a new stream with the same events as this stream.
Whenever more than timeLimit
passes between two events from this stream,
the onTimeout
function is called, which can emit further events on
the returned stream.
The countdown doesn't start until the returned stream is listened to. The countdown is reset every time an event is forwarded from this stream, or when this stream is paused and resumed.
The onTimeout
function is called with one argument: an
EventSink that allows putting events into the returned stream.
This EventSink
is only valid during the call to onTimeout
.
Calling EventSink.close on the sink passed to onTimeout
closes the
returned stream, and no further events are processed.
If onTimeout
is omitted, a timeout will just put a TimeoutException
into the error channel of the returned stream.
If the call to onTimeout
throws, the error is emitted on the returned
stream.
The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will have its individually timer that starts counting on listen, and the subscriptions' timers can be paused individually.
Implementation
Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)?}) {
_StreamControllerBase<T> controller;
if (isBroadcast) {
controller = new _SyncBroadcastStreamController<T>(null, null);
} else {
controller = new _SyncStreamController<T>(null, null, null, null);
}
Zone zone = Zone.current;
// Register callback immediately.
_TimerCallback timeoutCallback;
if (onTimeout == null) {
timeoutCallback = () {
controller.addError(
new TimeoutException("No stream event", timeLimit), null);
};
} else {
// TODO(floitsch): the return type should be 'void', and the type
// should be inferred.
var registeredOnTimeout =
zone.registerUnaryCallback<void, EventSink<T>>(onTimeout);
var wrapper = new _ControllerEventSinkWrapper<T>(null);
timeoutCallback = () {
wrapper._sink = controller; // Only valid during call.
zone.runUnaryGuarded(registeredOnTimeout, wrapper);
wrapper._sink = null;
};
}
// All further setup happens inside `onListen`.
controller.onListen = () {
Timer timer = zone.createTimer(timeLimit, timeoutCallback);
var subscription = this.listen(null);
// Set up event forwarding. Each data or error event resets the timer
subscription
..onData((T event) {
timer.cancel();
timer = zone.createTimer(timeLimit, timeoutCallback);
// Controller is synchronous, and the call might close the stream
// and cancel the timer,
// so create the Timer before calling into add();
// issue: https://github.com/dart-lang/sdk/issues/37565
controller.add(event);
})
..onError((Object error, StackTrace stackTrace) {
timer.cancel();
timer = zone.createTimer(timeLimit, timeoutCallback);
controller._addError(
error, stackTrace); // Avoid Zone error replacement.
})
..onDone(() {
timer.cancel();
controller.close();
});
// Set up further controller callbacks.
controller.onCancel = () {
timer.cancel();
return subscription.cancel();
};
if (!isBroadcast) {
controller
..onPause = () {
timer.cancel();
subscription.pause();
}
..onResume = () {
subscription.resume();
timer = zone.createTimer(timeLimit, timeoutCallback);
};
}
};
return controller.stream;
}