timeout method

Stream<T> timeout (Duration timeLimit, { void onTimeout(EventSink<T> sink) })

Creates a new stream with the same events as this stream.

Whenever more than timeLimit passes between two events from this stream, the onTimeout function is called, which can emit further events on the returned stream.

The countdown doesn't start until the returned stream is listened to. The countdown is reset every time an event is forwarded from this stream, or when this stream is paused and resumed.

The onTimeout function is called with one argument: an EventSink that allows putting events into the returned stream. This EventSink is only valid during the call to onTimeout. Calling EventSink.close on the sink passed to onTimeout closes the returned stream, and no further events are processed.

If onTimeout is omitted, a timeout will just put a TimeoutException into the error channel of the returned stream. If the call to onTimeout throws, the error is emitted on the returned stream.

The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will have its individually timer that starts counting on listen, and the subscriptions' timers can be paused individually.

Implementation

Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) {
  StreamController<T> controller;
  // The following variables are set on listen.
  StreamSubscription<T> subscription;
  Timer timer;
  Zone zone;
  _TimerCallback timeout;

  void onData(T event) {
    timer.cancel();
    controller.add(event);
    timer = zone.createTimer(timeLimit, timeout);
  }

  void onError(error, StackTrace stackTrace) {
    timer.cancel();
    assert(controller is _StreamController ||
        controller is _BroadcastStreamController);
    dynamic eventSink = controller;
    eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
    timer = zone.createTimer(timeLimit, timeout);
  }

  void onDone() {
    timer.cancel();
    controller.close();
  }

  void onListen() {
    // This is the onListen callback for of controller.
    // It runs in the same zone that the subscription was created in.
    // Use that zone for creating timers and running the onTimeout
    // callback.
    zone = Zone.current;
    if (onTimeout == null) {
      timeout = () {
        controller.addError(
            new TimeoutException("No stream event", timeLimit), null);
      };
    } else {
      // TODO(floitsch): the return type should be 'void', and the type
      // should be inferred.
      var registeredOnTimeout =
          zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout);
      var wrapper = new _ControllerEventSinkWrapper<T>(null);
      timeout = () {
        wrapper._sink = controller; // Only valid during call.
        zone.runUnaryGuarded(registeredOnTimeout, wrapper);
        wrapper._sink = null;
      };
    }

    subscription = this.listen(onData, onError: onError, onDone: onDone);
    timer = zone.createTimer(timeLimit, timeout);
  }

  Future onCancel() {
    timer.cancel();
    Future result = subscription.cancel();
    subscription = null;
    return result;
  }

  controller = isBroadcast
      ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
      : new _SyncStreamController<T>(onListen, () {
          // Don't null the timer, onCancel may call cancel again.
          timer.cancel();
          subscription.pause();
        }, () {
          subscription.resume();
          timer = zone.createTimer(timeLimit, timeout);
        }, onCancel);
  return controller.stream;
}