Graphql-js: Context lifecycle in subscriptions

Created on 1 Jun 2017  路  10Comments  路  Source: graphql/graphql-js

After reading through the codebase, my understanding of the subscription implementation is as follow :

  • initial call subscribe() when a subscription operation hits the server,
  • subscribe() returns an AsyncIterator
  • This AsyncIterator contains a stream of results of subsequent calls to execute()

Again, if my understanding is correct, contextValue is passed to subscribe on the initial subscription operation and then passed down to execute whenever an event triggers a reexecution.

In short, contextValue is set once a subscription operation happens and will remain the same until the client unsubscribes.

A common graphql-js implementation is to use DataLoader as a batching/caching library. For queries and mutations, a set of fresh dataloader instances is usually bound to contextValue on a per request basis. When the query/mutation is done, those DataLoader instances are left for garbage collection.

Now, subscription context has a much longer life span, making DataLoader caching behaviour unwarranted in most cases.

Ideally, I guess providing a way to set contextValue per execute() call would allow a behaviour that matches what one could except when setting contextValue on a per request basis for queries and mutations.

Is there any way to do so in current implementation ? Is this a behaviour that could be implemented in graphql-js or should I settle for one context per subscription operation and work my way around that ?

Most helpful comment

I had the same question in mind and piked at the code. It seems that we could add a makeExecutionContext(params.context) and use it to get an updated context before executing the query.

I think it could be hooked here and should be a simple change.

What do you think?

All 10 comments

I had the same question in mind and piked at the code. It seems that we could add a makeExecutionContext(params.context) and use it to get an updated context before executing the query.

I think it could be hooked here and should be a simple change.

What do you think?

@kouak Have you made any progress on this? Have you found a work-around?

Sorry, no progress on this.

My use case involves DataLoader, my workaround is to disable response caching and only leverage DataLoader batching abilities in the subscription context.

As far as I understand, your proposed implementation would not work.

My understanding of the subscription-transport-ws implementation is as follow :

  • A subscription message arrives on the wire
  • this.subscribe is called here returning an asyncIterable. This step sets a contextValue when a subscription is starded and gives control to graphql-js
  • subscription-transport-ws then loops over async values returned by graphql-js here

If my understanding is correct, a graphql-js consumer has no way of modifying contextValue once subscribe has been called.

maybe..., we can modify this: https://github.com/graphql/graphql-js/blob/master/src/subscription/subscribe.js#L149

import assign from 'lodash/assign';
import { execute, createSourceEventStream } from 'graphql';

function subscribe(
  schema,
  document,
  rootValue,
  contextValue,
  variableValues,
  operationName,
  fieldResolver,
  subscribeFieldResolver,
) {
  const sourcePromise = createSourceEventStream(
    schema,
    document,
    rootValue,
    contextValue,
    variableValues,
    operationName,
    subscribeFieldResolver,
  );

  const mapSourceToResponse = payload =>
    execute(
      schema,
      document,
      payload.data,
      assign({}, contextValue, payload.contextValue), // this line
      variableValues,
      operationName,
      fieldResolver,
    );

  return sourcePromise.then(
    sourceStream =>
      mapAsyncIterator(sourceStream, mapSourceToResponse, reportGraphQLError),
    reportGraphQLError,
  );
}

FWIW, here's how i solved this:

  • for each request, put a new dataloader in a bag of dataloaders
  • Pass in the bag of dataloaders to the context instead of a single dataloader
  • When your mutation publishes something, include the dataloader ID you're working with
  • When the subscription resolves the payload, use the dataLoader ID that was passed to it.

Extra details:

  • the data loaders created by the subscription should have cache: false that way if it doesn't use the mutation's dataloader, it doesn't store stuff for the life of the subscription
  • if the authorization logic for the mutation _payload_ is more strict than the subscription _payload_, flush it before sharing it or just don't share it
  • dispose of the subscription dataloader when you call asyncIterator.return().
  • dispose of the mutation dataloader after a couple seconds. trying to clear it after the last subscriber gets their message is VERY difficult unless you want to write your own pubsub engine
  • this only works for all subscribers on the same node, so be smart about stateful socket servers & stateless graphql servers & consider moving to redis when the time comes. worst case scenario: subscribers on a different worker don't use the cached value.

I have a function called buildDataLoaders() which returns an object keyed with the names of all the dataloaders I need, I call this the loaderBucket.

When I get a query or mutation over http then I call buildDataLoaders() to create a new loaderBucket and shove it in the context for the resolvers, the resolvers then use object destructuring to pull out the dataloaders they need.

Socket connections for subscriptions are are more difficult.

In the end I called buildDataLoaders() when the subscription starts, and implemented a flush() function which lives in the loaderBucket. flush() is a recursive call to buildDataLoaders() which will create a new set of fresh dataloaders, then clobber the loaderBucket with the new dataloaders.

In the resolve in the subscriptions I call flush() so it has a new set of dataLoaders for each event sent out. This way I can use the caching functionality of the dataloaders in subscriptions.

I know this isn't super efficient but it's pretty fast, and could be turned into a lazy implementation if I need to.

I just went to implement subscriptions for the first time and ran into this issue as well. Currently working around it using this patch using patch-package.

What this allows us to do is specify a function contextValueExecution which derives a new context for each new execution phase of the subscription.

const result = await graphqlSubscribe({
  schema: graphqlSchema,
  document,
  variableValues: variables,
  contextValue: makeDataContext('subscription'),
  contextValueExecution(originalCtx) {
    return makeDataContext('subscription-execution', { parent: originalCtx })
  },
})

I think given the way folks currently use context as a sort of singleton "state container" in the query / mutation execution, a similar concept is necessary for subscriptions executions to work well without too many work-arounds. Particularly that is usually the reason you're re-triggering a subscription - because state has changed, you'd want to start from a fresh context.

@IvanGoncharov let me know what you think of this approach, if it's something that would be reasonable to PR, and if so if the API naming is alright or should be changed.

@tgriesser I really like this solution. The only problem I see, and please correct me if I'm wrong, but it seems this might be predicated on the subscription server being able to read context from the stateless (non-subscription) server?

use case:

  • there exist 1+ stateful servers (state includes subscription iterator, websocket, etc.) and 1+ stateless graphql execution servers.
  • when that socket server receives a subscription request, it calls createSourceEventStream to create the Source Stream (a SubscriptionIterator) and uses that to crease a Response Stream
  • when ResponseStream.next() is called, it calls the stateless graphql server (with an optional server ID & dataloader ID so it can reuse the same dataloader that the mutation used)

The only problem I see, and please correct me if I'm wrong, but it seems this might be predicated on the subscription server being able to read context from the stateless (non-subscription) server?

It shouldn't have anything to do with servers, this is purely an addition to allow the mapSourceToResponse code to optionally create a new context, generated via a function that is provided to the original subscribe. How you create that context, or implement it in relation to your servers is up to you.

Any "graphql server", is really just a wrapper around the core execute algorithm, which you can actually call independently in your own code without needing a server. So this is just building on that idea to optionally allow a way to separate the context for subscribe from that of execute.

gotcha, i think we just use different words. what you call a graphql server i call an execution server. there can also exist a subscription server, which does _not_ execute graphql queries/mutations. In that case,mapSourceToResponse isn't used, so the fix wouldn't help any folks who separate the 2 https://github.com/graphql/graphql-js/blob/4150d1f51360a7981181cfec42a135394c7340f1/src/subscription/subscribe.js#L194-L197

Was this page helpful?
0 / 5 - 0 ratings

Related issues

henry74 picture henry74  路  4Comments

pranshuchittora picture pranshuchittora  路  3Comments

gjuchault picture gjuchault  路  4Comments

bhough picture bhough  路  3Comments

adriano-di-giovanni picture adriano-di-giovanni  路  3Comments