Apollo-android: Apollo subscription only onConnected() is called

Created on 18 Dec 2019  路  33Comments  路  Source: apollographql/apollo-android

I have a subscription to my server and it is connected successfully onConnected() method is called,
but when I upload something to the server no subscription methods is invoked.
this is my apollo client configuration

private const val webSocketUrl = "ws://12.34.56.7:3000/graphql"
private val okHttp = OkHttpClient
      .Builder()
      .addInterceptor(interceptor)
      .retryOnConnectionFailure(true)
      .pingInterval(30, TimeUnit.SECONDS)
      .build()

private val transportFactory = WebSocketSubscriptionTransport.Factory(webSocketUrl, okHttp)
val apolloSocketClient: ApolloClient = ApolloClient.builder()
       .okHttpClient(okHttp)
       .serverUrl(BuildConfig.BASE_URL)
       .subscriptionTransportFactory(transportFactory)
       .build()`

And this is invocation

val mySubscriptionCall = MySubscription("MY_TOPIC")
subscription = apolloSocketClient.subscribe(mySubscriptionCall)

   subscription.execute(object : ApolloSubscriptionCall.Callback<MySubscription.Data>{
        override fun onFailure(e: ApolloException) {
          Log.e("SUBSCRIPTION_","onFailure",e)
        }

        override fun onResponse(response: Response<MySubscription.Data>) {
          Log.e("SUBSCRIPTION_","onResponse ${response.data()}")
        }

        override fun onConnected() {
          Log.e("SUBSCRIPTION_","onConnected")
        }

        override fun onTerminated() {
          Log.e("SUBSCRIPTION_","onTerminated")
        }

        override fun onCompleted() {
          Log.e("SUBSCRIPTION_","onCompleted")
        }

      })`


I'm only getting onConnected

D/OkHttp: <-- 101 Switching Protocols http://12.34.56.7:3000/graphql (180ms)
D/OkHttp: Upgrade: websocket
D/OkHttp: Connection: Upgrade
D/OkHttp: Sec-WebSocket-Accept: QG+xxYzpdn9O2QUYMH0ZDKaUH1U=
D/OkHttp: Sec-WebSocket-Protocol: graphql-ws
D/OkHttp: <-- END HTTP
E/SUBSCRIPTION_: onConnected
```
Version
apollo lib version 1.2.2

Bug

All 33 comments

I don't have my project/code available to me at the moment, but I ran into the same issue a few weeks ago.

I ended up fixing it by implementing my own WebSocket transport class. Off the top of my head, my suspicions as to the potential origins of the bug are:

1) Something having to do with Kotlin<->Java usage.
2) AtomicReference being used to wrap the WebSocket object.
3) Some kind of scoping issue.

I just implemented the required interface as an anonymous object that wraps the OkHttp3 WebSocket object internally. It's pretty straightforward.

@xorander00 could you pls share your implementation of web socket transport?

@developer-- is it still a valid issue on your side?

@xorander00 @developer-- @sav007 - i also have this issue with a Java app using apollo-android talking to AWS AppSync, plz can you give more details with your solution?

Snippet of my code as requested is below. I want to say this is the first version that I did, but I believe I did have a newer revision somewhere with better handling and cleaned up code. Anyway, feel free to improve up on it.

fun apolloSubTransportFactory(): SubscriptionTransport.Factory {
     return SubscriptionTransport.Factory { callback ->
        val wsListener = object : WebSocketListener() {
            override fun onOpen(webSocket: WebSocket, response: okhttp3.Response) {
                callback.onConnected()
                super.onOpen(webSocket, response)
            }

            override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
                callback.onClosed()
                super.onClosed(webSocket, code, reason)
            }

            override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
                callback.onClosed()
                super.onClosing(webSocket, code, reason)
            }

            override fun onFailure(webSocket: WebSocket, t: Throwable, response: okhttp3.Response?) {
                Logger.d("WebSocketListener::onFailure: ${t.localizedMessage}")
                callback.onFailure(t)
                super.onFailure(webSocket, t, response)
            }

            override fun onMessage(webSocket: WebSocket, text: String) {
                callback.onMessage(OperationServerMessage.fromJsonString(text))
                super.onMessage(webSocket, text)
            }

            override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
                super.onMessage(webSocket, bytes)
            }
        }

        object : SubscriptionTransport {
            fun req(): Request {
                val json = sharedPrefs.get(KeyValModel.Key.USER_AUTH.value, "")
                return try {
                    if (json.isNotBlank()) {
                        val model = UserAuthModel.fromJsonString(json)
                        Request.Builder()
                            .url(BuildConfig.netGraphQLUrlWss)
                            .addHeader("Authorization", "Bearer ${model.tokenAccess ?: ""}")
                            .addHeader("X-Hasura-Role", model.userRole.value)
                            .addHeader("Sec-WebSocket-Protocol", "graphql-ws")
                            .addHeader("Cookie", "")
                        .build()
                    } else {
                        Logger.d("UserAuthModel JSON was empty, skipping...")
                        Request.Builder()
                            .url(BuildConfig.netGraphQLUrlWss)
                            .addHeader("Sec-WebSocket-Protocol", "graphql-ws")
                            .addHeader("Cookie", "")
                            .build()
                    }
                } catch (t: Throwable) {
                    Logger.e(t.localizedMessage ?: "(NULL)")

                    Request.Builder()
                        .url(BuildConfig.netGraphQLUrlWss)
                        .addHeader("Sec-WebSocket-Protocol", "graphql-ws")
                        .addHeader("Cookie", "")
                    .build()
                }
            }

            val ws: WebSocket by lazy { okHttpClient.newWebSocket(req(), wsListener) }

            override fun connect() {
                //Logger.d("SubscriptionTransport.connect(): Called, connecting WebSocket...")
                ws
            }

            override fun disconnect(message: OperationClientMessage?) {
                //Logger.d(message?.toJsonString())
                ws.close(1001, message?.toJsonString())
            }

            override fun send(message: OperationClientMessage?) {
                if (message != null) {
                    //Logger.d(message.toJsonString())
                    ws.send(message.toJsonString())
                } else {
                    Logger.w("SubscriptionTransport.send(message): message was null")
                }
            }
        }
    }
}

@xorander00 - ok, i couldn't get it to work, but thanks.

@sav007 - i have debugged this some more though with version 1.3.2. if i create the client like this:

  final OkHttpClient okHttpClient = new OkHttpClient.Builder()
                // [proxy and authentication here]
                .addInterceptor(new TestInterceptor())
                .build();
  final ApolloClient client = ApolloClient.builder()
    .serverUrl(API_URL)
    .okHttpClient(okHttpClient)
    .subscriptionHeartbeatTimeout(WAIT_TIMEOUT, TimeUnit.SECONDS)
    .subscriptionTransportFactory(new WebSocketSubscriptionTransport.Factory(SUB_URL, okHttpClient))
    .build();
  client.addOnSubscriptionManagerStateChangeListener(new TestOnSubscriptionManagerStateChangeListener());
  client.subscribe(new OnCreateTestSubscription())
    .execute(new TestApolloSubscriptionCallback<>());
  Thread.sleep(TimeUnit.SECONDS.toMillis(60));

where okHttpClient is connecting to AWS AppSync and the TestInterceptor, TestOnSubscriptionManagerStateChangeListener and TestApolloSubscriptionCallback classes just log in their implemented methods, then i get this:

2020-02-21T14:32:17.611 onStateChange(fromState, toState) fromState = DISCONNECTED, toState = CONNECTING
2020-02-21T14:32:17.611 intercept(chain) call = okhttp3.RealCall@7798704a, request = Request{method=GET, url=https://***.appsync-realtime-api.us-east-1.amazonaws.com/graphql, tags={}}
2020-02-21T14:32:18.241 onConnected()
2020-02-21T14:32:18.241 onStateChange(fromState, toState) fromState = CONNECTING, toState = CONNECTED
2020-02-21T14:32:18.253 onStateChange(fromState, toState) fromState = CONNECTED, toState = DISCONNECTED

which suggests that it successfully connects, as per the OP, but then immediately disconnects.

putting a breakpoint on the onStateChange() shows that it is called because RealSubscriptionManager.SubscriptionTransportCallback#onMessage() gets an OperationServerMessage.ConnectionError message whose payload is {errors: [{errorType: BadRequest, errorCode: 400}].

i can use the okHttpClient to make queries and mutations, so i assume it's otherwise configured ok.

any ideas?

@simonmarshall I'm using Hasura and that snippet was simply a copy/paste, so it's probably fairly specific to my case. It would definitely require tweaking, especially if using something like AppSync since I didn't have that in mind.

One of my suspicions is the WebSocket handshake, including headers, might be a source of the issue. I think I remember seeing in my server logs that it was missing the cookie and/or protocol headers, which results in the same behavior you mention (Connect and then immediate disconnect).

I'll try to revisit this sometime and possibly submit a PR with a proper refactoring/fix, but I'm a bit short on time currently so it might have to wait unfortunately.

EDIT: Sorry, posted using a different github account by mistake. Deleted previous response and replaced it with this one.

hi all - plz find attached how to reproduce this broken subscription issue with AWS AppSync, and let me know if you need any more info

apollo-android-issues-1864.pdf
apollo-android-issues-1864.zip

I am also having the exact same issue, so any solution would be helpful.

I am also having the exact same issue, so any solution would be helpful.

@rashid56156 - with appsync or some other backend?

@simonmarshall with appsync .

@simonmarshall with appsync .

ok, hopefully @sav007 or @martinbonnin can take a look at the issue reproduction code soon...

I don't use AppSync, so I wouldn't be able to give my direction on it, but in my case looking at the server logs helped.

That's where I saw that my Android client was able to successfully complete the handshake (HTTP 101 Switching Protocol to Upgrade), but then the server was closing the connection right after that because some header was missing.

Once I explicitly set "Sec-WebSocket-Protocol" & "Cookie" again, even if they were empty, it worked.

EDIT: Oh, and I'd also probably check that the ETAG value is getting through fine, though I'm not sure if it's necessary.

@simonmarshall i finally gave up on Apollo client and replaced it with AWSAppSyncClient which is a wrapper of Apollo Client.

Now my subscriptions are working fine.

@xorander00 - i can see the existing code adding those headers in WebSocketSubscriptionTransport.Factory, and i have verified they're in the payload along with others added by RealWebSocket#connect when the ApolloClient negotiates (apparently successfully) the upgrade to a web socket. i don't see any ETag value though, or there could be some other header missing. (if i put a bp on WebSocketSubscriptionTransport#send, i can see that the ApolloClient then sends a message {"type":"connection_init"} via the web socket. then i see WebSocketSubscriptionTransport.WebSocketListener#onMessage is called with {"type":"connection_error","payload":{"errors":[{"errorType":"BadRequest","errorCode":400}]}}. this happens even if i arrange not to send that init message, so i'm not sure what triggers the message from the server. though it seems odd it should agree to the request to upgrade to a websocket, but then shortly after decide a header was missing on the upgrade request and send the error.)

@rashid56156 - thanks for the pointer, but my company wishes to steer clear of it for the moment.

hmmm, it's weird that the apolloclient would get a successful _101 Switching Protocols_ response from appsync when using the URL without any query parameters, and only then get a _400 Bad Request_ message shortly after

@martinbonnin in case of AppSync, AWS sdk takes care of creating and passing subscription url from the original graphQL url plus some other headers as well.

On the other hand in case of Apollo we have to pass a separate subscription (websocket wss:// ) url so i am guessing we are missing some needed headers here. Apollo client accepts subscriptionConnectionParams through which i tried to pass auth and cookie headers but that didn't work as well.

if u run the test case i attached u will see that the TestAuthenticator is never invoked by the ApolloClient for the subscription connection and upgrade to websocket. (it is invoked when it does the query and mutation tests.) so, the ApolloClient never adds the x-api-key header to the subscription connection request. it only adds Sec-WebSocket-Protocol, Cookie and the upgrade headers.

however, if i change the ApolloClient to use an Interceptor, rather than Authenticator, to explicitly add the x-api-key header to the request, i still get the same subscription connection issue. so, it's not just a matter of a missing auth header. perhaps all headers (including x-api-key) do need to be encoded and passed as a query parameter for the connection. still, i don't get why the connection upgrade initially succeeds with a 101, if it's lacking auth.

@rashid56156 - looking at AWSAppSyncRealTimeProvider.ts#L676, amplify seems to be sending the same connection_init message payload as apollo. so, perhaps it's nothing to do with subscriptionConnectionParams, since those are for adding to message payloads sent over the socket, not adding to the original connection header. (and appsync sends a connection_error regardless of whether the client sends a connection_init.)

@martinbonnin - yes the connection issue is with the query params. this comment shows what the http call query params should be for AppSync. the comment below shows what the websocket subscription start message should look like for AppSync. the subscription url should look like this:

wss://API_HOST.appsync-realtime-api.API_REGION.amazonaws.com/graphql?header=HEADER&payload=e30=

where API_HOST is the API-specific host part and HEADER is base64 encoded json of the form:

{"host": "API_HOST.appsync-api.API_REGION.amazonaws.com", "x-amz-date": "DATETIME", "x-api-key": "API_KEY"}

where DATETIME is in the format "yyyyMMdd'T'HHmmss'Z'" (see here and here) and API_KEY is the API key if that is the authz mode. note that the header host uses appsync-api, whereas the url host uses appsync-realtime-api.

@martinbonnin - ok, i figured it out with the help of AWS support. the comment above shows what the http call query params should be for AppSync. this comment shows what the websocket subscription start message should look like for AppSync:

{
  "id": "AUTO_ID",
  "type": "start",
  "payload": {
    "data": "{\"variables\":{...},\"query\":\"subscription ...\"}",
    "extensions": {
      "authorization": {
        "host": "API_HOST.appsync-api.API_REGION.amazonaws.com",
        "x-amz-date": "DATETIME",
        "x-api-key": "API_KEY"
      }
    }
  }
}

where the data value is a json string (not json) and the extensions value is json (not a json string). yes, really. see amplify's WebSocketConnectionManager#requestSubscription() here.

the authorization json is the same as that encoded in the header query parameter, which varies depending on the authz mode.

if i arrange for my implementation of WebSocketSubscriptionTransport#send() to send the above, i do get back a subscription event over the socket:

{"type":"data","id":"AUTO_ID","payload":{"data":{...}}}

(although since i have to fake the AUTO_ID value, such that it doesn't match what the ApolloClient has generated internally to track an event to its subscription, my subscription callback doesn't get called.)

as it currently stands, the ApolloClient can't support AppSync subscriptions, but hopefully this at least explains what is needed.

@simonmarshall AWS published a blog post on how to use AppSync subscriptions with pure WebSocket libraries and confirms some of your findings : https://aws.amazon.com/blogs/mobile/appsync-websockets-python/
https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html

Hello @martinbonnin @simonmarshall @rashid56156

I've been able to use Apollo subscription with AppSync thanks to this thread, if anyone is interested I can either share in a gist or make a PR. Let me know because I would need to clean the code

I've been able to use Apollo subscription with AppSync thanks to this thread, if anyone is interested I can either share in a gist or make a PR. Let me know because I would need to clean the code

hi @Fgabz - yeah for sure i'd be interested! - thanks in advance

@simonmarshall https://gist.github.com/Fgabz/751ef0b4cc7b5ce3dfa0310cded74dd0

Feel free to comment, it might have some issues. I don't know if we can add it to the repo.

Hello everyone!

I am having exactly the same issue on Android (Java) using Apollo subscriptions. Any new solution/workaround about this?

Same here, using a Django backend, subscriptions work from GraphiQL so server should be fine.
Only onConnected() is called, as the issue title suggests.
Using Android with Kotlin.
Wondering how this is not fixed yet, it seems like almost one year passed since its discovery.

@Halex193 What hosting are you using?

I'm running it locally in a docker container for the moment.

@theodhorpandeli are you using AppSync ? Coule the suggestion above help ? Or is it another backend?

@Halex193 is there a sample Django server/DockerFile that can be used to reproduce the issue?

I'm going to be re-visiting the Android app this weekend where I used subscriptions. I'll check my code and see what I did to fix the issue in my case. Keep in mind though that I'm not using AWS AppSync, I'm using a local Hasura instance for my GraphQL server.

Also, my project is in Kotlin, not Java. Kotlin/Java Interop could potentially be a factor, although I'm not sure how likely. I do remember using the Java implementation as reference and re-implementing it in Kotlin for use in my app. Once I did that, it worked fine.

Hello guys! Actually we solved the issue and it was something related with the connection parameters that the Apollo client should send. The backend side was configured to wait for a JWT under a "Authorization" header but the client wasn't sending it. There was an undefined error and the connection would drop. We managed to handle the absence of the JWT and it was working fine. So basically, it was just an error caused by the wrong JWT implementation.

This endpoint helped us (me and my backend developer friend) to localise the error: https://3wqzw.sse.codesandbox.io/graphql

Map<String, Object> connectionParams = new HashMap<>();
connectionParams.put("Authorization", "Bearer " + token);

 apolloClient = ApolloClient
                .builder()
                .okHttpClient(okHttpClient)
                .serverUrl(Constants.SERVER_URL)
                .subscriptionConnectionParams(new SubscriptionConnectionParams(connectionParams))
                .subscriptionTransportFactory(new WebSocketSubscriptionTransport.Factory(Constants.SOCKET_URL, okHttpClient))
                .build();

Everything is working fine now.

Unfortunately my team has quickly moved on to using a raw websocket implementation for this project, but I'll test it again on the old version when I get the chance.

Everything is working fine now.

That is awesome, I hope that others who have this issue will find this fix.

I still believe this should either be documented or fixed by calling onFailure() (just my 2 cents)

I should post it very early, here is how I solved that problem

object ApolloWebSocketClient {
    fun getApolloSocketClient(token: String): ApolloClient {
      val okHttp =
        OkHttpClient
          .Builder()
          .addInterceptor(interceptor)
          .build()
      val HEADERS = mapOf(
        "authorization" to "Bearer $token"
      )
      val transportFactory = WebSocketSubscriptionTransport.Factory(BuildConfig.SOCKET_URL, okHttp)
      val apolloSocketClient: ApolloClient = ApolloClient.builder()
        .okHttpClient(okHttp)
        .serverUrl(BuildConfig.BASE_URL)
        .subscriptionConnectionParams(SubscriptionConnectionParams(HEADERS))
        .subscriptionTransportFactory(transportFactory)
        .build()
      return apolloSocketClient
    }
  }
Was this page helpful?
0 / 5 - 0 ratings