Rxjs: WebSocketSubject fails to recreate socket when close isn't immediate

Created on 18 Mar 2019  路  7Comments  路  Source: ReactiveX/rxjs

Bug Report

Current Behavior
If the close of a websocket takes a little bit of time subsequent calls to multiplex fail to send any subscription messages and close the socket immediately. See reproduction below.

Reproduction

import { interval } from "rxjs";
import { webSocket } from "rxjs/webSocket";
import { switchMap, take } from "rxjs/operators";

const socket = webSocket<any>({
  url: "ws://localhost/ws"
});

interval(2_000)
  .pipe(
    take(2),
    switchMap(id => {
        return socket.multiplex(
          () => `sub ${id}`,
          () => `unsub ${id}`,
          value => value === id
        );
    })
  )
  .subscribe({
    next: _ => { },
    error: err => console.error(err),
    complete: () => console.log("Complete")
  });

Expected behavior
A new call to multiplex should subscribe and stay open.

Environment

  • 6.4.0

Most helpful comment

My use case here is I have 2 tabs in a UI, when I change tabs I unsubscribe to a stream, which starts to close the connection. The new tab makes a different subscribe but this fails as as the close isn鈥檛 complete.

My current work around is to delay new subscriptions by 250ms, but this is really dependant on how long the connection takes to close.

All 7 comments

I see you put a lot of effort into this reproduction. Thank you for that. I'm a little strapped for time to dig into this, but on the surface, I'm concerned that this might be behaving as intended, as it's supposed to buffer messages until the new socket is open. 馃

My use case here is I have 2 tabs in a UI, when I change tabs I unsubscribe to a stream, which starts to close the connection. The new tab makes a different subscribe but this fails as as the close isn鈥檛 complete.

My current work around is to delay new subscriptions by 250ms, but this is really dependant on how long the connection takes to close.

I bump into exact same thing with similar use case ( switching between two routes ).

Agree that the delay work around works and dependant on how long the connection takes to close.

I borrow @markmcdowell stackblitz to reproduce how its works when set to 1s.

https://stackblitz.com/edit/rxjs-nps9v1?file=index.ts

any luck with this?

@benlesh @cartant I am also having the exact same issue. Basically, what seems to be happening is that the flow of the WebsocketSubject closing and reconnecting sockets is very dependent on the JavaScript stack.

In my case, we are subscribing and unsubscribing during an Angular route change. After doing a deep dive into the WebsocketSubject, here is what is happening.

  1. The new route navigates and runs resolvers before the ngOnDestroy method is run so we get some new subscriptions before the unsubscribe occurs.
  2. Since the multiplex method uses a closure with an Observable, the unsubscribe method is then called before the new subscriptions have been added to the _ouput subject. (see lines 367-375 of WebsocketSubject.ts). Because when the subscription was initially called, there was a subject, these subscriptions are not bufferred.
  3. This kicks off a close on the existing socket
  4. The new subscriptions are then added through to the subject (see lines 222-249)
  5. When the subscription is added, the subject sees that it doesn't have a connection (this is before the socket is actually closed) and tries to make a new connection.
  6. Before the new socket opens, the old socket returns onclose (see lines 334-345) which resets the state (which has already been done by the unsubscription on lines 367-375)
  7. Then the new socket opens and when it checks for the value of this._socket on line 279, it has been reset to null so it closes the new socket

I have been considering different approaches to fixing this. One would be to have some kind of buffer count that gets incremented when the multiplex method is called which then is checked on line 369 before the socket is closed and reset. This would allow that code to know if there were any new subscriptions to be staged.

Another approach to fixing this might be to set a property on the class (like _socketOpening) when a socket is opened via _connectSocket to true and check for that in socket.onclose and not call resetState if that value is true. Then once socket.onopen is called, you would set that value to false.

What do you think? I am happy to make a PR for either of these solutions or a solution that you propose.

Any update on this as an issue? I have had to implement both solutions in my code to fix the issues outlined in my post above.

Another approach to fixing this might be to set a property on the class (like _socketOpening) when a socket is opened via _connectSocket to true and check for that in socket.onclose and not call resetState if that value is true. Then once socket.onopen is called, you would set that value to false.

I ran into the same issue and I also used this fix to solve my problem.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

Zzzen picture Zzzen  路  3Comments

LittleFox94 picture LittleFox94  路  3Comments

haf picture haf  路  3Comments

benlesh picture benlesh  路  3Comments

cartant picture cartant  路  3Comments