Platform: Component store race condition issue

Created on 9 Jun 2020  路  4Comments  路  Source: ngrx/platform

Minimal reproduction of the bug/regression with instructions:

https://stackblitz.com/edit/ngrx-component-store-race-condition?file=index.ts

in console

> field2 false
> state {field1: false, field2: false}
> field2 true
> state {field1: true, field2: true}
> field2 false
> state {field1: true, field2: false}

Expected behavior:

> field2 false
> state {field1: false, field2: false}
> state {field1: true, field2: false}
> field2 true
> state {field1: true, field2: true}

Expecting having last state and field2 value equal to the store one.

Versions of NgRx, Angular, Node, affected browser(s) and operating system(s):

Last version of https://github.com/ngrx/platform/blob/master/modules/component-store/src/component-store.ts

Other information:

We have same issue in our small store lib. Will be glad to know how to fix it propely.
We've added optional lock param to update method and move any intermediate updates (that fires before all first update subscribers recieve new state value) to the queue. But with such solution there is an issue with wrong state value right after second update call, it's expecting to be updated but it's not cause update is in queue.

I would be willing to submit a PR to fix this issue

[ ] Yes (Assistance is provided if you need help submitting a pull request)
[x] No

Component Store

Most helpful comment

The "problem" is that an updater is invoked in the subscribe of a selector.
(This is actually an anti-pattern)

store.select(state => state.field1)
  .pipe(filter(field1 => field1))
  .subscribe(() => store.updater(state => ({ ...state, field2: true }))());

In @ngrx/store this is resolved by using a queueScheduler.

I've updated the stackblitz example and it seems to fix this issue.

cc @alex-okrushko

updater<V>(
    updaterFn: (state: T, value: V) => T
  ): unknown extends V ? () => void : (t: V | Observable<V>) => Subscription {
    return ((observableOrValue?: V | Observable<V>): Subscription => {
      let initializationError: Error | undefined;
      // We can receive either the value or an observable. In case it's a
      // simple value, we'll wrap it with `of` operator to turn it into
      // Observable.
      const observable$ = isObservable(observableOrValue)
        ? observableOrValue
        : of(observableOrValue);
      const subscription = observable$
        .pipe(
+         observeOn(queueScheduler),
          concatMap((value) =>
            this.isInitialized
              ? of(value).pipe(withLatestFrom(this.stateSubject$))
              : // If state was not initialized, we'll throw an error.
                throwError(
                  Error(`${this.constructor.name} has not been initialized`)
                )
          ),
          takeUntil(this.destroy$)
        )
        .subscribe({
          next: ([value, currentState]) => {
            this.stateSubject$.next(updaterFn(currentState, value!));
          },
          error: (error: Error) => {
            initializationError = error;
            this.stateSubject$.error(error);
          },
        });

      if (initializationError) {
        // prettier-ignore
        throw /** @type {!Error} */ (initializationError);
      }
      return subscription;
    }) as unknown extends V
      ? () => void
      : (t: V | Observable<V>) => Subscription;
  }

All 4 comments

The "problem" is that an updater is invoked in the subscribe of a selector.
(This is actually an anti-pattern)

store.select(state => state.field1)
  .pipe(filter(field1 => field1))
  .subscribe(() => store.updater(state => ({ ...state, field2: true }))());

In @ngrx/store this is resolved by using a queueScheduler.

I've updated the stackblitz example and it seems to fix this issue.

cc @alex-okrushko

updater<V>(
    updaterFn: (state: T, value: V) => T
  ): unknown extends V ? () => void : (t: V | Observable<V>) => Subscription {
    return ((observableOrValue?: V | Observable<V>): Subscription => {
      let initializationError: Error | undefined;
      // We can receive either the value or an observable. In case it's a
      // simple value, we'll wrap it with `of` operator to turn it into
      // Observable.
      const observable$ = isObservable(observableOrValue)
        ? observableOrValue
        : of(observableOrValue);
      const subscription = observable$
        .pipe(
+         observeOn(queueScheduler),
          concatMap((value) =>
            this.isInitialized
              ? of(value).pipe(withLatestFrom(this.stateSubject$))
              : // If state was not initialized, we'll throw an error.
                throwError(
                  Error(`${this.constructor.name} has not been initialized`)
                )
          ),
          takeUntil(this.destroy$)
        )
        .subscribe({
          next: ([value, currentState]) => {
            this.stateSubject$.next(updaterFn(currentState, value!));
          },
          error: (error: Error) => {
            initializationError = error;
            this.stateSubject$.error(error);
          },
        });

      if (initializationError) {
        // prettier-ignore
        throw /** @type {!Error} */ (initializationError);
      }
      return subscription;
    }) as unknown extends V
      ? () => void
      : (t: V | Observable<V>) => Subscription;
  }

In @ngrx/store this is resolved by using a queueScheduler.

is that ok that with such solution you have wrong state value right after second update call?

store.select(state => state.field1)
  .pipe(
    filter(field1 => field1),
    switchMap(() => {
      store.updater(state => ({ ...state, field2: true }))();

      return store.select(state => state.field2);
    })
  )
  .subscribe(field2 => console.log(field2))

will produce

true

despite update was already called.
So update call does not gurantee correct store state immediately and update can be finished in some future?

Thanks for bringing it to our attention.

Since all of these updates are happening within the same micro task, would you rather have this output without any intermediary emissions?

> field2 true
> state {field1: true, field2: true}

This is how we combine emissions from multiple Observables in selector. I wasn't putting a single value from the source under the same grouping, which I can adjust.

Thoughts?

I've looked into this and created a few perf tests for different cases.

Here are the result for the tests.

ComponentStore version | Average timing to get the results
------------ | -------------
ComponentStoreDebounceSync | 0.6137999999998556
ComponentStoreExisting | 1.3654500000029657
ComponentStoreNoDebounceSync | 0.8695999999997639
ComponentStoreObserve | 0.848799999998846
ComponentStoreObserveAndDebounceSync| 0.5467000000010103

So the store with observeOn(queueScheduler) and debounceSync() for the single observable values during the selection is a clear winner.

I've done a lot more testing with real Angular app - here is the data, which is a bit hard to read, but the results are similar.

Btw, the expected output would be the following:

> field2 true
> state {field1: true, field2: true}
Was this page helpful?
0 / 5 - 0 ratings

Related issues

bhaidar picture bhaidar  路  3Comments

smorandi picture smorandi  路  3Comments

shyamal890 picture shyamal890  路  3Comments

brandonroberts picture brandonroberts  路  3Comments

dollyshah-02-zz picture dollyshah-02-zz  路  3Comments