Current Behavior
If a sync duration selector is used in throttle operator, throttle operator won't recognise it has emitted an event and will keep on throttling all other events.
According to throttle documentation duration selector is "A function that receives a value from the source Observable, for computing the silencing duration for each source value, returned as an Observable or a Promise.", hence I would expect it to work with sync streams. Besides, the same behaviour does work as expected with the debounce operator.
Reproduction
https://stackblitz.com/edit/rxjs-w7dbvs
Please see console, to see all both async duration selectors and debounce operator keep on emitting events, while throttle operator with sync duration selector keeps on ignoring them as if no event was emitted by duration selector.
Expected behavior
The throttle operator should work with a sync Observable the same way it does work with async one (and the same way debounce operator does). If sync duration selector is used, throttle should keep on immediately emitting the following events and not ignoring them.
Environment
I think the problem is here:
Considering your first example:
source.pipe(
throttle(x => of(true))
).subscribe(x => console.log('a ', x), null, () => console.log('completed'));
duration will be of(true), an observable that synchronously emits and then completes.
subscribeToResult(this, duration) will subscribe to this observable and because everything happens synchronously, it also completes. Therefore, a closed inner subscriber will be assigned to this._throttled.
And here is what happens when a new outer value arrives:
because this._throttled is not null, nothing will happen, meaning that it can't be emptied out.
A quick fix I think would be this:
private throttle(value: T): void {
const duration = this.tryDurationSelector(value);
if (!!duration) {
const innerSubscriber = subscribeToResult(this, duration);
!innerSubscriber.closed && this.add(this._throttled = innerSubscriber);
}
}
Most helpful comment
I think the problem is here:
https://github.com/ReactiveX/rxjs/blob/f5c6332b6098ec5c302f8034ad2273106e301275/src/internal/operators/throttle.ts#L125-L130
Considering your first example:
durationwill beof(true), an observable that synchronously emits and then completes.subscribeToResult(this, duration)will subscribe to this observable and because everything happens synchronously, it also completes. Therefore, a closed inner subscriber will be assigned tothis._throttled.And here is what happens when a new outer value arrives:
https://github.com/ReactiveX/rxjs/blob/f5c6332b6098ec5c302f8034ad2273106e301275/src/internal/operators/throttle.ts#L102-L113
because
this._throttledis not null, nothing will happen, meaning that it can't be emptied out.A quick fix I think would be this: