Rxjs: Throttle operator doesn't work correctly with sync duration selector

Created on 17 May 2020  路  1Comment  路  Source: ReactiveX/rxjs

Bug Report

Current Behavior
If a sync duration selector is used in throttle operator, throttle operator won't recognise it has emitted an event and will keep on throttling all other events.

According to throttle documentation duration selector is "A function that receives a value from the source Observable, for computing the silencing duration for each source value, returned as an Observable or a Promise.", hence I would expect it to work with sync streams. Besides, the same behaviour does work as expected with the debounce operator.

Reproduction

https://stackblitz.com/edit/rxjs-w7dbvs

Please see console, to see all both async duration selectors and debounce operator keep on emitting events, while throttle operator with sync duration selector keeps on ignoring them as if no event was emitted by duration selector.

Expected behavior
The throttle operator should work with a sync Observable the same way it does work with async one (and the same way debounce operator does). If sync duration selector is used, throttle should keep on immediately emitting the following events and not ignoring them.

Environment

  • Runtime: Chrome
  • RxJS version: both ^6.5.2 and 7.0.0-beta.0.

Most helpful comment

I think the problem is here:

https://github.com/ReactiveX/rxjs/blob/f5c6332b6098ec5c302f8034ad2273106e301275/src/internal/operators/throttle.ts#L125-L130

Considering your first example:

source.pipe(
  throttle(x => of(true))
).subscribe(x => console.log('a ', x), null, () => console.log('completed'));

duration will be of(true), an observable that synchronously emits and then completes.

subscribeToResult(this, duration) will subscribe to this observable and because everything happens synchronously, it also completes. Therefore, a closed inner subscriber will be assigned to this._throttled.

And here is what happens when a new outer value arrives:

https://github.com/ReactiveX/rxjs/blob/f5c6332b6098ec5c302f8034ad2273106e301275/src/internal/operators/throttle.ts#L102-L113

because this._throttled is not null, nothing will happen, meaning that it can't be emptied out.

A quick fix I think would be this:

private throttle(value: T): void {
  const duration = this.tryDurationSelector(value);
  if (!!duration) {
    const innerSubscriber = subscribeToResult(this, duration);

    !innerSubscriber.closed && this.add(this._throttled = innerSubscriber);
  }
}

>All comments

I think the problem is here:

https://github.com/ReactiveX/rxjs/blob/f5c6332b6098ec5c302f8034ad2273106e301275/src/internal/operators/throttle.ts#L125-L130

Considering your first example:

source.pipe(
  throttle(x => of(true))
).subscribe(x => console.log('a ', x), null, () => console.log('completed'));

duration will be of(true), an observable that synchronously emits and then completes.

subscribeToResult(this, duration) will subscribe to this observable and because everything happens synchronously, it also completes. Therefore, a closed inner subscriber will be assigned to this._throttled.

And here is what happens when a new outer value arrives:

https://github.com/ReactiveX/rxjs/blob/f5c6332b6098ec5c302f8034ad2273106e301275/src/internal/operators/throttle.ts#L102-L113

because this._throttled is not null, nothing will happen, meaning that it can't be emptied out.

A quick fix I think would be this:

private throttle(value: T): void {
  const duration = this.tryDurationSelector(value);
  if (!!duration) {
    const innerSubscriber = subscribeToResult(this, duration);

    !innerSubscriber.closed && this.add(this._throttled = innerSubscriber);
  }
}
Was this page helpful?
0 / 5 - 0 ratings

Related issues

shenlin192 picture shenlin192  路  3Comments

marcusradell picture marcusradell  路  4Comments

matthewwithanm picture matthewwithanm  路  4Comments

chalin picture chalin  路  4Comments

LittleFox94 picture LittleFox94  路  3Comments