Graphene: Graphene real-time subscriptions and Apollo client graphql integration

Created on 3 Mar 2017  ·  83Comments  ·  Source: graphql-python/graphene

Hello @syrusakbary.

Thanks for all your hard work on graphene and graphql-python. Awesome library!!

I posted this on #393 earlier this week...reposting here so it's easier to discover.

I implemented a port of the apollo graphql subscriptions modules (graphql-subscriptions and subscriptions-transport-ws) for graphene / python. They work w/ apollo-client.

It is here.

Same basic api as the Apollo modules. It is still very rough...but works so far, based on my limited internal testing. Uses redis-py, gevent-websockets, and syrusakbary/promises. I was going to add a simple example app, setup.py for easier install, and more info to the readme w/ the API, in the next few days. A brief example is below. Only works on python2 for now. My plan is to start working on tests as well. I figured I'd go ahead and share in this early stage in case anybody is interested...

I'm very new to open source, so any critiques or pull requests are welcome.

Simple example:

Server (using Flask and Flask-Sockets):

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_sockets import Sockets

from .subscription_manager import SubscriptionManager, RedisPubsub
from .subscription_transport_ws import ApolloSubscriptionServer

app = Flask(__name__)
sockets = Sockets(app)
pubsub = RedisPubsub()
schema = graphene.Schema(
    query=Query,
    mutation=Mutation,
    subscription=Subscription
)
subscription_mgr = SubscriptionManager(schema, pubsub)

@sockets.route('/socket')
def socket_channel(websocket):
    subscription_server = ApolloSubscriptionServer(subscription_mgr, websocket)
    subscription_server.handle()
    return []

if __name__ == "__main__":
    from geventwebsocket import WebSocketServer

    server = WebSocketServer(('', 5000), app)
    print '  Serving at host 0.0.0.0:5000...\n'
    server.serve_forever()

Of course on the server you have to "publish" each time you have a mutation (in this case to a redis channel). That could look something like this (using graphene / sql-alchemy):

class Subscription(graphene.ObjectType):
    users = graphene_sqlalchemy.SQLAlchemyConnectionField(
        User,
        active=graphene.Boolean()
    )

    def resolve_users(self, args, context, info):
        query = User.get_query(context)
        return query.filter_by(id=info.root_value.get('id'))

class AddUser(graphene.ClientIDMutation):

    class Input:
        username = graphene.String(required=True)
        email = graphene.String()

    ok = graphene.Boolean()
    user = graphene.Field(lambda: User)

    @classmethod
    def mutate_and_get_payload(cls, args, context, info):
        _input = args.copy()
        del _input['clientMutationId']
        new_user = UserModel(**_input)
        db.session.add(new_user)
        db.session.commit()
        ok = True
        if pubsub.subscriptions:
            pubsub.publish('users', new_user.as_dict())
        return AddUser(ok=ok, user=new_user)

Client (using react-apollo client):

import React from 'react'
import ReactDOM from 'react-dom'
import { graphql, ApolloProvider } from 'react-apollo'
import gql from 'graphql-tag'
import ApolloClient, { createNetworkInterface } from 'apollo-client'
import { SubscriptionClient, addGraphQLSubscriptions } from 'subscriptions-transport-ws'

import ChatApp from './screens/ChatApp'
import ListBox from '../components/ListBox'

const SUBSCRIPTION_QUERY = gql`
  subscription newUsers {
    users(active: true) {
      edges {
        node {
          id
          username
        }
      }
    }
  }
`

const LIST_BOX_QUERY = gql`
  query AllUsers {
    users(active: true) {
      edges {
        node {
          id
          username
        }
      }
    }
  }
`

class ChatListBox extends React.Component {

  componentWillReceiveProps(newProps) {
    if (!newProps.data.loading) {
      if (this.subscription) {
        return
      }
      this.subscription = newProps.data.subscribeToMore({
        document: SUBSCRIPTION_QUERY,
        updateQuery: (previousResult, {subscriptionData}) => {
          const newUser = subscriptionData.data.users.edges
          const newResult = {
            users: {
              edges: [
                ...previousResult.users.edges,
                ...newUser
              ]
            }
          }
          return newResult
        },
        onError: (err) => console.error(err)
      })
    }
  }

  render() {
    return <ListBox data={this.props.data} />
  }
}

const ChatListBoxWithData = graphql(LIST_BOX_QUERY)(ChatListBox)

export default ChatListBoxWithData

const networkInterface = createNetworkInterface({
  uri: 'http://localhost:5000/graphql'
})

const wsClient = new SubscriptionClient(`ws://localhost:5000/socket`, {
  reconnect: true
})

const networkInterfaceWithSubscriptions = addGraphQLSubscriptions(
  networkInterface,
  wsClient,
)

const client = new ApolloClient({
  dataIdFromObject: o => o.id,
  networkInterface: networkInterfaceWithSubscriptions
})

ReactDOM.render(
  <ApolloProvider client={client}>
    <ChatApp />
  </ApolloProvider>,
  document.getElementById('root')
)
wontfix

Most helpful comment

@Eraldo @hballard Yes, I think I should post an update here as I'm working full on subscriptions now.

Some thoughts about my journey: the way Apollo-Subscriptions use to manage subscriptions was not very friendly for the developer, needing to hack around the resolution and a specific PubSub implementation that was "bypassing" the GraphQL engine for adapting it into subscriptions.
The reason for that is the GraphQL-js engine was not ready for subscriptions (meaning that was only able to return either a promise or a static value, but not a async iterator).

However GraphQL-js recently added a way to subscribing to a GraphQL query (that return an async iterator a.k.a. Observable) that pushed towards simpler and cleaner implementations of subscriptions that decouple the subscription resolution from the "listener" on the subscription.

That led to better implementations of the transport mechanisms in GraphQL subscriptions like subscriptions-transport-ws.

So, in summary, subscriptions is something that should be bundled fully into the GraphQL engine, in a way that is easy to plug any mechanisms, such as:

  • websockets with asyncio
  • websockets with gevent
  • websockets with django-daphne

That don't require any specific pub/sub implementation and, eventually, let this decision to the developer (in case it want to use it).

For the next version of Graphene, 2.0 I plan to have subscriptions bundled into the Engine :)
There is already a branch in graphql-core where I'm doing the research process.

I will keep updating this thread with more information as I keep working on it.

All 83 comments

This is awesome!

I will take a closer look next week and provide some feedback then :)

@syrusakbary Thoughts?

Nice!!!

@syrusakbary Any update on this?

I took a look in the implementation and besides some small nits (like Apollo* naming) it looked good!

I'm waiting to few things:

  • Having a subscriptions integration with Django so we assure that the subscriptions structure is abstracted in a scalable way.
  • The formal subscription RFC in the GraphQL spec to be merged https://github.com/facebook/graphql/pull/305

Thanks for the comments @syrusakbary . I think your grapqhl / graphene / promises libraries are amazing...any constructive criticism you have I'm happy to hear. I only used the "Apollo.." naming convention because my implementation was initially based on their (Apollo's) graphql subscription transport protocol. But, as the final spec should be merged soon, I'm happy to drop that convention, since it only affects the main subscription transport class. I've been tied up the last couple months, since I published this, so I haven't been able to devote much time to improving it. Some priorities in the near term for me:

  • Finish up tests. I'm about done w/ the last bit (focused on the transport module) and should be publishing them in the next few days.
  • Add Python 3 support
  • Add Django support
  • Finish simple example chat app
  • Abstract the concurrency executor (uses Gevent now...but obviously with python >3.5 would like to have asyncio as an option and also be able to use regular threads (for simple dev / testing); I was thinking something like what you did in graphene / graphql-core
  • Add a simple pubsub class for testing / development; so the RedisPubsub doesn't have to be used
  • Lot's of other things could be done obviously...my time being the limiting factor there. Once I finish the tests...obviously easier for folks to contribute.

Thanks!

@hballard @syrusakbary With the official spec now merged, is there room for contributions on getting subscriptions implemented? I'm interested in using graphene with a new django project, and this would be huge.

Indeed. This would make Django a great fit for angular2 real time apps

@Helw150 - I can't speak for @syrusakbary, but I know I'd welcome any contributions on my repo. Also, not sure if @syrusakbary is interested in integrating it into graphene eventually, just prefers to fork it, or go his own way. I started this mainly as a hobby project, when I was playing w/ Apollo subscriptions and noticed their wasn't an implementation for graphene (python being my preferred server language). I just pushed a commit to the "tests" branch with about half the subscriptions-transport tests and all the subscriptions-manager tests were added a few weeks ago. The rest of the transport tests should be easier to finish up now, the initial test setup for those slowed me down a bit--some of it was new for me. I haven't had a ton of time to devote to this and I don't really use django...so not sure when I would get to that. My next focus would be Python 3 compatibility...which might not be that difficult. Of course now that the initial commits for graphql subscriptions have been added to the spec, probably lot's more need to be done outside of these focues. You can read my summary of the my transport tests commit on the issue I created for them here

Update: Initial tests have been added as of last weekend (see commit here and I merged a commit last week that added Python 3 compatibility (2.7, 3.4, 3.5, & 3.6). My next two priorities are adding additional executors (threads, asyncio, etc.) and some type of Django compatibility. I don't really use Django...for those that do...is channels the preferred method for adding real-time services to Django now or something like django-websocket-redis or django-socketio? I've been doing a little reading on Django and channels...

Great work @hballard. I'd like to use this as inspiration for an example in my ReactQL starter kit to show how subscriptions can be served from a non-Node.js server.

@leebenson - Very cool. Any feedback you can provide is appreciated. Let me know if I can be of assistance.

Per @syrusakbary previous comment above, "Having a subscriptions integration with Django so we assure that the subscriptions structure is abstracted in a scalable way..."; I've been thinking about the best way to do that.

My thought is that it should be fairly straightforward to generalize the concurrency executor methods like @syrusakbary did in graphql-core and have separate executor classes for each concurrency library (probably will even borrow some of the logic in graphql-core for each one). Then the RedisPubsub and SubscriptionTransport classes would utilize the corresponding executor passed in by the user when they instantiate each class. I'd welcome any feedback anyone has (@syrusakbary or others) on this structure.

I spent a couple hours reading through the Django channels library this weekend and it would seem it could be treated as just another executor class under this model. Also, anyone familiar w/ Django...seems like I would utilize a "class-based consumer" (inherit from WebsocketConsumer) for the SubscriptionServer. The redis pubsub "wait_and_get_message" method in the RedisPubsub class could be implemented as another consumer. Thoughts on this (from anyone more familiar w/ Django channels)?

How about getting someone from the django core team to assist?
I think reactive programming is something that would bring new people to django which would benefit in the long run. It was the reason for me to look into other solutions.
What do you think?

I'm happy to have any assistance from another contributor--particularly a django core. I haven't reached out to them since I don't utilize django and I needed to abstract the concurrency executor from the base subscriptions manager and websocket server logic, in order to use other concurrency frameworks (like django channels). It would be fairly straightforward to integrate the current gevent version with django-socketio or create a simple package similar to flask-sockets to integrate geventwebsocket into django directly. I found a small library on bitbucket that seems to do just that -- django-gevent-websocket (here).

I'm currently working on abstracting the currency executor to be able to use asyncio for concurrency as well (vs the current gevent). I should merge a version of that with master in the next week or so (which will allow use w/ Sanic web framework using uvloop / asyncio)...and then I was going to turn my attention to django integration, using the same abstraction base. But my plan was to focus more on django-channels...since that seems to be the way forward for django concurrency.

Thank you for the update @hballard that sounds really awesome. :D

I am using django in some projects and would love to replace drf (django-rest-framework) with what you are building. I really prefer reactive programming and I think a graphQL api with subscriptions would totally add a lot of value on top of the django project.
I am not a core django person nor do I have experience with django-channels.. I do however have practical experience with django. Feel free to reach out me if it helps. ;)

@hballard will you be at EuroPython? maybe we can find someone to help during the sprints! :)

Afraid not...I live in Texas (US)...and that would be a bit of hike!

Did you get to give it a try yet? :)

@Eraldo @hballard Yes, I think I should post an update here as I'm working full on subscriptions now.

Some thoughts about my journey: the way Apollo-Subscriptions use to manage subscriptions was not very friendly for the developer, needing to hack around the resolution and a specific PubSub implementation that was "bypassing" the GraphQL engine for adapting it into subscriptions.
The reason for that is the GraphQL-js engine was not ready for subscriptions (meaning that was only able to return either a promise or a static value, but not a async iterator).

However GraphQL-js recently added a way to subscribing to a GraphQL query (that return an async iterator a.k.a. Observable) that pushed towards simpler and cleaner implementations of subscriptions that decouple the subscription resolution from the "listener" on the subscription.

That led to better implementations of the transport mechanisms in GraphQL subscriptions like subscriptions-transport-ws.

So, in summary, subscriptions is something that should be bundled fully into the GraphQL engine, in a way that is easy to plug any mechanisms, such as:

  • websockets with asyncio
  • websockets with gevent
  • websockets with django-daphne

That don't require any specific pub/sub implementation and, eventually, let this decision to the developer (in case it want to use it).

For the next version of Graphene, 2.0 I plan to have subscriptions bundled into the Engine :)
There is already a branch in graphql-core where I'm doing the research process.

I will keep updating this thread with more information as I keep working on it.

@syrusakbary what ended being the recommended approach on this? I'm using graphql python and loving it, and I'm working on an iot device project where they use mqtt for data observations. Curious if there is a recommended approach or if it is 'do what thou wilt' for subscriptions in graphql python.

Thanks!

can we have an example of how subscriptions works with Graphene in Django?

thanks a los, i was trying to find an example for the last 2 days

Hi @AgentChris, take a look at this module, maybe you might be interested:
graphene-django-subscriptions

I would like to know what is the progress in the implementation of subscriptions into the core of Graphene, could someone clarify please?

@Oxyrus it seems that the mechanism has been decided to be rx observables but the method for delivery to the client is still up to you.

Here's a gist with my solution using django-channels. It's a working proof of concept. Next task is optimize it for production environment. Feedback welcome!

@tricoder42 interesting code layout, I was using observables to emit subscriptions but I was having problems making django pretend to be async .. Your aproach seems cleaner.
To avoid repeating queries you could use promises and dataloaders to group as much repetitive work to just one computation.

@japrogramer What do you mean by dataloaders to group?

The main problem is that workers run in different processes, so I need to run query at least once per process. Second problem is that workers are intended to be short-lived, so I can't take it granted that parsed query is available. Worker can be restarted anytime.

@tricoder42 So Is there hope of getting this into graphene or not? Should it be spun off into its own library?

The only part which would fit into graphene is Subscription class. Everything else depends on backend (django-channels or redis), so it might be better to either keep it in separate package or locally in project. Anyway, django-channels 2.0 are on the way which makes imlementation a bit cleaner.


    1. 2018 v 1:13, Chad Dombrova notifications@github.com:

@tricoder42 So Is there hope of getting this into graphene or not? Should it be spun off into its own library?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.

@tricoder42 how does django-channels 2.0 make the implementation cleaner?

@japrogramer I havent' tried it yet, but it should be possible to register custom callbacks to a Group.

Right now it's only possible to add another Channel to a Group and when message is sent, it's automatically broadcasted to all channels in Group. This is enough if you know the shape of data before sending a message. E.g if you have a Serializer (like in REST API), you can listen post_save signal, serialize new instance and broadcast it to the Group.

In GraphQL, however, we don't know what shape of data user requested, so we need to serialize instance for each Channel (subscriber) in a group separately. That's why I'm having two layers of notifications - one for model changes and second for graphql subscriptions. In channels 2.0 it should be possible to register a callback which feeds data to Observable and returns serialized data.

After first real-world implementation and testing I updated the gist with recent code changes:

  • added session handling
  • fixed GraphQLSubscriptionStore - subscription_id isn't unique across clients
  • added cleanup on ws.disconnect

Still using django-channels 1.x.

Any examples of Graphql/Graphene using subscriptions in a Flask App? Hopefully something comparable with Apollo suite ?

@tricoder42 I just did some experimenting with Channels 2.0 this is what I have ..
Im still digging into the Docs. I also don't have much time this days but Ill update once I get something more substantial going.

Note: line 62, probably needs to be made sync since I don't think on_next can be async

 1   from django.utils.translation import get_language                                                                                                                                                                                                                                                                                                                                                                                     
  1 import json                                                                                                                                                                                                                                                                                                                                                                                                                           
  2 # import functools                                                                                                                                                                                                                                                                                                                                                                                                                    
  3 import asyncio                                                                                                                                                                                                                                                                                                                                                                                                                        
  4 import rx                                                                                                                                                                                                                                                                                                                                                                                                                             
  5                                                                                                                                                                                                                                                                                                                                                                                                                                       
  6 from channels.consumer import AsyncConsumer                                                                                                                                                                                                                                                                                                                                                                                           
  7                                                                                                                                                                                                                                                                                                                                                                                                                                       
  8 from graphene_django.settings import graphene_settings as gqsettings                                                                                                                                                                                                                                                                                                                                                                  
  9 from .views import DataLoaders                                                                                                                                                                                                                                                                                                                                                                                                        
 10                                                                                                                                                                                                                                                                                                                                                                                                                                       
 11                                                                                                                                                                                                                                                                                                                                                                                                                                       
 12 schema = gqsettings.SCHEMA                                                                                                                                                                                                                                                                                                                                                                                                            
 13                                                                                                                                                                                                                                                                                                                                                                                                                                       
 14                                                                                                                                                                                                                                                                                                                                                                                                                                       
 15 class GQLConsumer(AsyncConsumer):                                                                                                                                                                                                                                                                                                                                                                                                     
 16     # NOTE: asgiref.SyncToAsync for django ORM                                                                                                                                                                                                                                                                                                                                                                                        
 17                                                                                                                                                                                                                                                                                                                                                                                                                                       
 18     async def graphsend(self, opID, result, message):                                                                                                                                                                                                                                                                                                                                                                                 
 19         data = result.data                                                                                                                                                                                                                                                                                                                                                                                                            
 20         await self.send(                                                                                                                                                                                                                                                                                                                                                                                                              
 21             {                                                                                                                                                                                                                                                                                                                                                                                                                         
 22                 'type': 'websocket.send',                                                                                                                                                                                                                                                                                                                                                                                             
 23                 'text': str(json.dumps({'data': data, 'type': 'data', 'id': opID}))                                                                                                                                                                                                                                                                                                                                                   
 24             })                                                                                                                                                                                                                                                                                                                                                                                                                        
 25                                                                                                                                                                                                                                                                                                                                                                                                                                       
 26     # @allowed_hosts_only                                                                                                                                                                                                                                                                                                                                                                                                             
 27     async def websocket_connect(self, event):                                                                                                                                                                                                                                                                                                                                                                                         
 28         # message.reply_channel.send({'accept': True, 'text': json.dumps({'type': 'connection_ack'})})                                                                                                                                                                                                                                                                                                                                
 29         #  TODO: This might need some security, auth users or apps only <10-11-17> #                                                                                                                                                                                                                                                                                                                                                  
 30         await self.send({                                                                                                                                                                                                                                                                                                                                                                                                             
 31             "type": "websocket.accept",                                                                                                                                                                                                                                                                                                                                                                                               
 32         })                                                                                                                                                                                                                                                                                                                                                                                                                            
 33                                                                                                                                                                                                                                                                                                                                                                                                                                       
 34     async def websocket_receive(self, message):                                                                                                                                                                                                                                                                                                                                                                                       
 35         # message is gone from the call signature, need to inspect the content of text_data and bytes_data                                                                                                                                                                                                                                                                                                                            
 36         clean = json.loads(message['text'])                                                                                                                                                                                                                                                                                                                                                                                           
 37         gqtype = clean.get('type')                                                                                                                                                                                                                                                                                                                                                                                                    
 38         clean = clean.get('payload')                                                                                                                                                                                                                                                                                                                                                                                                  
 39                                                                                                                                                                                                                                                                                                                                                                                                                                       
 40         if gqtype == 'connection_init':                                                                                                                                                                                                                                                                                                                                                                                               
 41             await self.send({'type': 'websocket.send', 'text': json.dumps({'type': 'connection_ack'})})                                                                                                                                                                                                                                                                                                                               
 42         elif gqtype == 'start':                                                                                                                                                                                                                                                                                                                                                                                                       
 43             __import__('pdb').set_trace()                                                                                                                                                                                                                                                                                                                                                                                             
 44             self.operationName = clean.get('operationName')                                                                                                                                                                                                                                                                                                                                                                           
 45             self.query = clean.get('query')                                                                                                                                                                                                                                                                                                                                                                                           
 46             self.foovar = clean.get('variables')                                                                                                                                                                                                                                                                                                                                                                                      
 47                                                                                                                                                                                                                                                                                                                                                                                                                                       
 48             # This part acts like a request                                                                                                                                                                                                                                                                                                                                                                                           
 49             message = dict()                                                                                                                                                                                                                                                                                                                                                                                                          
 50             message['reply_channel'] = self.channel_name                                                                                                                                                                                                                                                                                                                                                                              
 51             message['scope'] = self.scope                                                                                                                                                                                                                                                                                                                                                                                             
 52             message['dataloaders'] = DataLoaders(get_language())                                                                                                                                                                                                                                                                                                                                                                      
 53             self.kwargs = {'context_value': message}                                                                                                                                                                                                                                                                                                                                                                                  
 54                                                                                                                                                                                                                                                                                                                                                                                                                                       
 55             #  TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #                                                                                                                                                                                                                                                                                                                           
 56             #  TODO: Implement timeout mechanism <10-11-17> #                                                                                                                                                                                                                                                                                                                                                                         
 57             result = schema.execute(self.query, variable_values=self.foovar, allow_subscriptions=True, **self.kwargs)                                                                                                                                                                                                                                                                                                                 
 58             if isinstance(result, rx.Observable):                                                                                                                                                                                                                                                                                                                                                                                     
 59                 class MyObserver(rx.Observer):                                                                                                                                                                                                                                                                                                                                                                                        
 60                                                                                                                                                                                                                                                                                                                                                                                                                                       
 61                     def on_next(self, x):                                                                                                                                                                                                                                                                                                                                                                                             
 62                         self.graphsend(self.operationName, x, message)                                                                                                                                                                                                                                                                                                                                                                
 63                                                                                                                                                                                                                                                                                                                                                                                                                                       
 64                     def on_error(self, e):                                                                                                                                                                                                                                                                                                                                                                                            
 65                         ...                                                                                                                                                                                                                                                                                                                                                                                                           
 66                                                                                                                                                                                                                                                                                                                                                                                                                                       
 67                     def on_completed(self):                                                                                                                                                                                                                                                                                                                                                                                           
 68                         ...                                                                                                                                                                                                                                                                                                                                                                                                           
 69                                                                                                                                                                                                                                                                                                                                                                                                                                       
 70                 result = result.publish().auto_connect()                                                                                                                                                                                                                                                                                                                                                                              
 71                 result.subscribe(MyObserver())                                                                                                                                                                                                                                                                                                                                                                                        
 72         elif gqtype == 'stop':                                                                                                                                                                                                                                                                                                                                                                                                        
 73             operationName = clean.get('operationName')                                                                                                                                                                                                                                                                                                                                                                                
 74             await self.channel_layer.group_discard(operationName, self.channel_name)                                                                                                                                                                                                                                                                                                                                                  
 75         else:                                                                                                                                                                                                                                                                                                                                                                                                                         
 76             await self.send({'type': 'websocket.send', 'text': json.dumps({'data': 'connection_ack'})})                                                                                                                                                                                                                                                                                                                               
 77                                                                                                                                                                                                                                                                                                                                                                                                                                       
 78     async def websocket_disconnect(self):                                                                                                                                                                                                                                                                                                                                                                                             
 79         if 'Groups' in self.scope['session']:                                                                                                                                                                                                                                                                                                                                                                                         
 80             for x in self.scope['session']['Groups'].split(','):                                                                                                                                                                                                                                                                                                                                                                      
 81                 ...                                                                                                                                                                                                                                                                                                                                                                                                                   
 82                 # for every Group in the session, unsubscribe current connection                                                                                                                                                                                                                                                                                                                                                      
 83                 await self.channel_layer.group_discard(x, self.channel_name)                                                                                                                                                                                                                                                                                                                                                          
 84             # finally del the Groups from the session                                                                                                                                                                                                                                                                                                                                                                                 
 85             del self.scope['session']['Groups']                                                                                                                                                                                                                                                                                                                                                                                       
~                                                     

@kavnik

Not with flask, but I did build something with Sanic which is flask like if you are interested I can send it to you.

See here: https://github.com/graphql-python/graphene/issues/545

Adam Hopkins

On Feb 7, 2018, 4:13 AM +0200, kavink notifications@github.com, wrote:

Any examples of Graphql/Graphene using subscriptions in a Flask App? Hopefully something comparable with Apollo suite ?

You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub, or mute the thread.

also super interested in anything with flask or flask-like

@ahopkins Please can you post the example someplace. Would love to look at it and maybe port it to flask.

@ahopkins Thanks ! How does one publish to feed and subscribe from server side ? i.e. how can I modify for functions to either send data into feed to client or listen to client from feed.

@kavink graphene is intended to use RXPY for that part, I'm currently trying to work out how to use that with channels so i don't know that much about it but hopefully it leads you down the right road.

@tricoder42 Thank you for your promising code!

Is it possible to use Group instead of GraphQLSubscriptionStore to store queries (as sending to multiple channels in a group is optimized)?

For example:

# subscribed_groups = {'User': set(), ...}
# subscription_string = transform('subscription mysub on UserNode { ... }');
def on_message(message):
    global subscribed_groups;
    ...
    subscription_string = 'sub-%s' % hash(message.content['payload'])
    Group(subscription_string).add(reply_channel);
    model_name = message.content['model']
    subscribed_groups[model_name].add(subscription_string);
    ...

@receiver(post_save, sender=model)
def send_model_update(...):
    global subscribed_groups;
    for group_name in subscribed_groups[model.__name__]:
        Group(group_name).send(...)

@heyrict Unfortunately you can't in general case, because each client might be subscribed for different data. In other words: two clients might use the same subscription, but still fetch different data. I'm using custom redis store, because I need to keep graphql query around. In django-channels 1x, workers and interface are different processes so I can't data using globals either.

However, in django-channels 2.x this is solved differently, I'm gonna try it soon.

@japrogramer @ahopkins Thanks for gist and the pointer to RXPY, I will certainly look at it. But what im trying to understand how would it all work , Graphene/RXPY and the gist(Graphql subscriptions), basically trying to wrap my head around on it. i.e. should I use RXPY to publish and read from feed ? But I dont see a way to create channels in RXPY, like architecture wise what all components tie together. I can then try to reverse and implement something working .

I just submitted a pull request to graphql-ws that gives an example of a publish - subscription implementation, to make it easier to use subscriptions with graphql-ws. I modified the README and examples to show how it might work. Not sure if @syrusakbary would want this as a part of the graphql-ws library or in a separate one. Here is my fork in case you want to try it out.

@japrogramer How do you broadcast changes in DB? Do you use django signals?

I've updated my project to django-channels 2.x and also the gist.

TL;DR:

  • post_save listener sends messages to django.{app_label}.{model} group.
  • when new subscription is created, consumer's channel is added to django.{app_label}.{model} (this defaults to subscription's output_type, but might be overridden in Subscription.subscribe method)
  • when message from post_save signal is received, the query is executed again, but this time with (pk, model_label) tuple as a root_value. Subscription.next receives this tuple as first parameter and return None or corresponding object from db.

I wish I could parse query just a once and pass a coroutine to Observable. At initialization, the coroutine would subscribe to django.{app_label}.{model}, and then I would simply call coroutine.send((pk, model)) and Observer would send serialized data to client. However it seems that whenever I pass iterable to observable, it always tries to consume it whole, even when it should wait for new data. I'm kinda lost here, but it's just an optimization. The implementation works as it is, now I'm just struggling with unit tests.

Any feedback welcome,
cheers!

@tricoder42 you could try using rx.Observable.from_iterable also take a look at this page for examples, here is a direct link to a relevant example: https://github.com/thomasnield/oreilly_reactive_python_for_data/blob/master/class_notes/class_notes.md#44---an-observable-emitting-tweets

Im still trying to wrap my head around observables. I like how if the resolve method for the subscription receives an object instance it publishes a new result.

@japrogramer Thank you for the link! I finally got it, see updated gist.

Key points:

  • StreamObservable - simple observable which you can send data into.
  • Subscription.resolve - subscribes to model_changes and map data from observable to Subscription.next, which returns instance to be sent as a new result

The pipeline is following:

  1. channel model_changed - notification is received on model_changed channel
  2. StreamObservable.send - calling stream.send pushes new data down the stream
  3. Subscription.next - (pk, model) tuple is resolved in model instance
  4. GraphQL Executor - all results from observable are serialized using GraphQL executor
  5. GraphqlSubcriptionConsumer._send_result - finally, new data are sent to client

@tricoder42 what query are you using with your code? doesn't the model get read from the database for each subscriber?
Did you mean to pass an observer to your stream initiation here L71 ?

@japrogramer Yeah, each subscribers needs to load the instance from DB. It may be optimised for single worker, but once you start scaling up, each subscriber might be handled by different process and you can only pass serializable objects through channels.

StreamObservable could be actually replaced with rx.Subject which does the same.

Example query is:

subscription AlertsSubscription {
  alerts {
      id,
      code,
      date,
      read
  }
}

@tricoder42 how about using dataloaders from promises to fetch the instance so that the db is only hit once per process?

Dear @syrusakbary on July 30 you made a great comment about the place subscriptions implementation must be, you also told that you start the research in the branch. I just wanted to ask for an update. Did you succeed in your research? Shall we expect subscriptions to be the part of Graphene library eventually? I see there are couple implementations available (e.g. GraphQL Subscription with django-channels, graphene-django-subscriptions), but none of them can be used in production out of the box (by different reasons).

@prokher actually I'm using @tricoder42 's implementation for graphql subscription in production (with minor modifications as I'm using python3.5 and redis backend) , and it turns out working perfectly (and much cleaner than graphene-django-subscriptions).

Making bridges between two packages may be time consuming, but if you want to use subscription right now, I recommend you to simply migrate the code to your project, as it won't be much work.

@heyrict Thank you very much, we came to the same conclusion. The code in the gist indeed very simple and clean. That is definitely way to go. Manu thanks to @tricoder42.

BTW, just for information: the protocol used by the subscriptions-transport-ws (WebSocket-based transport used by Apollo subscriptions) is described here.

What do you think about not sending useless data back to the client?
Actually, I am having lots of subscription objects with {'subscriptionName': None}.

    def _send_result(self, id, result):
        # Don't send results if no useful data is generated
        errors = result.errors
        if not errors:
            if not isinstance(result.data, dict):
                return
            if sum(map(lambda x: x != None, result.data.values())) == 0:
                return

        self.send({
            'type':
            'websocket.send',
            'text':
            json.dumps({
                'id': id,
                'type': 'data',
                'payload': {
                    'data': result.data,
                    'errors': list(map(str, errors)) if errors else None,
                }
            })
        })

hey quick follow up, has anyone managed to make an asyncronous consumer for subscriptions ?

currently im using concurrent threads as the executor but im switch over to
from graphql.execution.executors.asyncio import AsyncioExecutor

Im mocking out some code .. here is what i have so far

from django.utils.translation import get_language
from asgiref.sync import SyncToAsync

import json
import functools
import asyncio
import concurrent.futures
import rx
from rx.subjects import Subject
from rx.concurrency import AsyncIOScheduler

from channels.consumer import AsyncConsumer
from channels.exceptions import StopConsumer

from graphene_django.settings import graphene_settings as gqsettings
from .views import DataLoaders


schema = gqsettings.SCHEMA


class GQLConsumer(AsyncConsumer):
    # NOTE: asgiref.SyncToAsync for django ORM

    def __init__(self, scope):
        super().__init__(scope)
        # keeps a record of streams
        self.subscriptions = {}
        # keeps a record of groups the connection belongs to
        self.groups = {}
        # scheduler
        # self.scheduler = AsyncIOScheduler()

    # @allowed_hosts_only
    async def websocket_connect(self, event):
        # message.reply_channel.send({'accept': True, 'text': json.dumps({'type': 'connection_ack'})})
        #  TODO: This might need some security, auth users or apps only <10-11-17> #
        await self.send({
            "type": "websocket.accept",
            "subprotocol": "graphql-ws",
        })

    async def websocket_receive(self, message):
        # message is gone from the call signature, need to inspect the content of text_data and bytes_data
        request = json.loads(message['text'])

        if request['type'] == 'connection_init':
            await self.send(
                {
                    'type': 'websocket.send',
                    'text': json.dumps({'type': 'connection_ack'})
                })
        elif request['type'] == 'start':
            payload = request.get('payload')
            id = request.get('id')

            # This part acts like a request, QUESTION: should this be a dict like object that set/get from the self.scope?
            message = dict()
            message['id'] = id
            message['reply_channel'] = self.channel_name
            message['scope'] = self.scope
            message['groups'] = self.groups
            message['dataloaders'] = DataLoaders(get_language())

            stream = Subject()
            #  TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #
            #  TODO: Implement timeout mechanism <10-11-17> #
            # result = await SyncToAsync(schema.execute)(self.query, variable_values=self.foovar, allow_subscriptions=True, **self.kwargs)
            result = asyncio.wait_for(
                                      schema.execute(
                                          payload['query'],
                                          operation_name=request['operationName'],
                                          variable_values=payload['variables'],
                                          executor=concurrent.futures.ThreadPoolExecutor(),
                                          root_value=Observable.create(stream).share(),
                                          allow_subscriptions=True,
                                          **{'context_value': message})
                                      )

            if isinstance(result, rx.Observable):
                result = result.publish().auto_connect()
                result.subscribe(functools.partial(self._send_result, id))
                self.subscriptions[id] = stream
            else:
                self._send_result(id, result)
        elif request['type'] == 'stop':
            operationName = request.get('operationName')
            await self.channel_layer.group_discard(operationName, self.channel_name)

    async def websocket_disconnect(self):
        for group in self.groups.keys():
            await self.channel_layer.group_discard(group, self.channel_name)

        await self.send({
            "type": "websocket.close", "code": 1000
        })
        raise StopConsumer()

    async def _send_result(self, id, result):
        errors = result.errors

        await self.send({
            'type': 'websocket.send',
            'text': json.dumps({
                'id': id,
                'type': 'data',
                'payload': {
                    'data': result.data,
                    'errors': list(map(str, errors)) if errors else None,
                }
            })
        })

    def model_changed(self, message):
        forwhat = message['models']
        ...

ok Im a bit closer to having an async consumer.. however there is one small issue.
Let me go into detail before you get to the code. currently I am attempting to use an async executor to run the schema .. that however causes the result not to return an observable but a

the errors property returns this

[RuntimeError('This event loop is already running',)]

so I would expect that the schema not to execute since these error should mark the end of the execution and nothing should be resolve .. however some strange behaviour occours ..
I set a brake point inside of the resolve method for the subscription and after the schema executes and is returned that brake point is hit

> /app/apple/graphquery/consumers.py(74)websocket_receive()
-> result = schema.execute(
(Pdb) c
> /app/apple/graphquery/consumers.py(84)websocket_receive()
-> if isinstance(result, rx.Observable):
(Pdb) result
<graphql.execution.base.ExecutionResult object at 0x7f901c35f488>
(Pdb) result.errors
[RuntimeError('This event loop is already running',)]
(Pdb) c
> /app/apple/product/schema.py(179)resolve_sub_product()
-> await make_sub(info, input.get('product'))
(Pdb) ll
177         async def resolve_sub_product(self, info, **input):
178             __import__('pdb').set_trace()
179  ->         await make_sub(info, input.get('product'))
180             name = ProductType._meta.model.__class__.__name__
181
182             stream = info.root_value
183             return stream.map(lambda message: self.next(message, info, **input))
(Pdb)

here is what the updated consumer looks like

from django.utils.translation import get_language
from asgiref.sync import SyncToAsync

from rx.subjects import Subject
from rx.concurrency import AsyncIOScheduler

from channels.consumer import AsyncConsumer
from channels.exceptions import StopConsumer

from graphql.execution.executors.asyncio import AsyncioExecutor
from graphql import graphql
from graphene_django.settings import graphene_settings as gqsettings
from .views import DataLoaders

import rx
import json
import json
import functools
import asyncio
import functools
import asyncio


schema = gqsettings.SCHEMA


class GQLConsumer(AsyncConsumer):
    # NOTE: asgiref.SyncToAsync for django ORM

    def __init__(self, scope):
        super().__init__(scope)
        # keeps a record of streams
        self.subscriptions = {}
        # keeps a record of groups the connection belongs to
        self.groups = {}
        # self.executor = AsyncioExecutor(loop=asyncio.get_event_loop())

    # @allowed_hosts_only
    async def websocket_connect(self, event):
        # message.reply_channel.send({'accept': True, 'text': json.dumps({'type': 'connection_ack'})})
        #  TODO: This might need some security, auth users or apps only <10-11-17> #
        await self.send({
            "type": "websocket.accept",
            "subprotocol": "graphql-ws",
        })

    async def websocket_receive(self, message):
        # message is gone from the call signature, need to inspect the content of text_data and bytes_data
        request = json.loads(message['text'])

        if request['type'] == 'connection_init':
            await self.send(
                {
                    'type': 'websocket.send',
                    'text': json.dumps({'type': 'connection_ack'})
                })
        elif request['type'] == 'start':
            payload = request.get('payload')
            id = request.get('id')

            # This part acts like a request, QUESTION: should this be a dict like object that set/get from the self.scope?
            message = dict()
            message['id'] = id
            message['reply_channel'] = self.channel_name
            message['scope'] = self.scope
            message['subscribe'] = functools.partial(self._subscribe, id)
            message['dataloaders'] = DataLoaders(get_language())

            stream = Subject()
            #  TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #
            #  TODO: Implement timeout mechanism <10-11-17> #
            # result = await SyncToAsync(schema.execute)(self.query, variable_values=self.foovar, allow_subscriptions=True, **self.kwargs)
            __import__('pdb').set_trace()
            result = schema.execute(
                          payload['query'],
                          variable_values=payload['variables'],
                          root_value=rx.Observable.create(stream).share(),
                          allow_subscriptions=True,
                          executor=AsyncioExecutor(loop=asyncio.get_event_loop()),
                          **{'context_value': message})
            isinstance(result, rx.Observable)
            __import__('pdb').set_trace()

            if isinstance(result, rx.Observable):
                result = result.publish().auto_connect()
                result.subscribe(functools.partial(self._send_result, id))
                self.subscriptions[id] = stream
            else:
                self._send_result(id, result)
        elif request['type'] == 'stop':
            await _unsubscribe(request)

    async def websocket_disconnect(self, *args, **kwargs):
        for group in self.groups.keys():
            await self.channel_layer.group_discard(group, self.channel_name)

        await self.send({
            "type": "websocket.close", "code": 1000
        })
        raise StopConsumer()

    def _subscribe(self, id, gp_name):
        group = self.groups.setdefault(gp_name, set())
        self.groups[gp_name].add(id)

    async def _unsubscribe(self, request):
            operationName = request.get('operationName')
            await self.channel_layer.group_discard(operationName, self.channel_name)
            id = request.get('id')
            del self.subscriptions[id]

    async def _send_result(self, id, result):
        errors = result.errors
        await self.send({
            'type': 'websocket.send',
            'text': json.dumps({
                'id': id,
                'type': 'data',
                'payload': {
                    'data': result.data,
                    'errors': list(map(str, errors)) if errors else None,
                }
            })
        })

    async def model_changed(self, message):
        __import__('pdb').set_trace()
        gp_name = message['gp_name']
        pk = message['pk']

        for id in self.groups.get(gp_name, []):
            stream = self.subscriptions.get(id)
            if not stream:
                continue
            stream.on_next((pk, model))

The misterous async subscription that executes after it fails to execute ...

class ProductSubscritption(object):

    """test"""
    sub_product = graphene.Field(
        ProductType,
        description='subscribe to updated product',
        product=graphene.ID())

    async def resolve_sub_product(self, info, **input):
        __import__('pdb').set_trace()
        await make_sub(info, input.get('product'))
        name = ProductType._meta.model.__class__.__name__

        stream = info.root_value
        return stream.map(lambda message: self.next(message, info, **input))

    @classmethod
    def next(cls, message, info, **input):
        # here the message comes from the stream but the info and **input come frome
        # subscribing to the next method to the stream
        inst = relay.Node.get_node_from_global_id(info, input.get('product'))
        return inst

and the signal register, this doesn't seem to be related to the issue im having but just in case you would like to look at it

from asgiref.sync import AsyncToSync
from channels.layers import get_channel_layer

from graphene import relay
from graphql_relay import from_global_id as fgi  # , to_global_id
from promise.dataloader import DataLoader
from promise import Promise

import json

channel_layer = get_channel_layer()
group_send = AsyncToSync(channel_layer.group_send)


def send_update(sender, instance, created, attr='pk', *args, **kwargs):
    value = str(getattr(instance, attr))
    payload = {
        'type': 'model.changed',
        'pk': instance.pk,
        'attr': value,
        'created': created,
    }
    # if the instance was created, send to channels listening for created
    if created:
        group_send(
                'gqp.{0}-add'.format(str.lower(instance.__class__.__name__)),
                payload
            )
        return
    # pk is prefered if available over the provided attr
    if hasattr(instance, 'pk'):
        name = getattr(instance, 'pk')
    else:
        name = value
    gp_name = 'gqp.{0}-updated.{1}'.format(str.lower(instance.__class__.__name__), name)
    payload['gp_name'] = gp_name

    # If the item was updated, send signal to channels listening for updates
    group_send(
            gp_name,
            payload
        )

async def make_sub(info, gid):
    inst = relay.Node.get_node_from_global_id(info, gid)
    try:
        gp_name = 'gqp.{0}-updated.{1}'.format(str.lower(inst.__class__.__name__), inst.pk)
        # Group(gp_name).add(info.context.reply_channel)
        # info.context.channel_session['Groups'] = ','.join( (gp_name, info.context.channel_session['Groups']))
        await channel_layer.group_add(
                gp_name,
                info.context['reply_channel']
            )
        subscribe = info.context['subscribe']
        if subscribe:
            subscribe(gp_name)
    except:
        pass

here im going to tag some people,
@tricoder42 @prokher @hballard @syrusakbary

getting closer : https://github.com/graphql-python/graphql-core/issues/63#issuecomment-396017113

I figured it out,
just needed this line to get async observable working

result.subscribe(lambda t: loop.create_task(self._send_result( id, t)))

currently using a work around for this issue https://github.com/graphql-python/graphene-django/issues/452

This is the work around, instead of using subscriptions directly .. I use them to trigger regular queries

  subscribeToNewMessages() {
    this.subscription = this.props.data.subscribeToMore({
      document: gql`
        subscription ChangeInProduct($id: ID!) {
          subProduct(product: $id) {
            title
          }
        }
      `,
      variables: { id: this.props.match.params ? this.props.match.params.id : null },
      updateQuery: (prev, { subscriptionData }) => {
        if (!subscriptionData.data) {
          return prev;
        }
        console.log('interesting', prev, subscriptionData);
        this.props.data.refetch()
        return prev;
      },
    });
  }

JFYI: We have eventually published our domestic subscriptions implementation DjangoChannelsGraphqlWs. Actually we implemented a simple WebSocket-based GraphQL server implemented on Django Channels. Subscriptions are implemented in the Graphene-like style. The work is still in progress, but probably someone will find it useful.

Ahhh! i have been waiting for this. @prokher i will definitely check it out.

@Musbell be careful, it is not ready for production yet, currently we are working hard to improve (add asynchronous message handling, ordering/serialization, examples, etc), and we are happy to receive any feedback you have.

@prokher thanks for the notice. :+1:

Subscriptions should be in Graphene2.0+ @syrusakbary mentioned it in this tweet https://twitter.com/syrusakbary/status/923325568157859840?lang=en
I can't find the function in the repo or any documentation, let's hope we get an update soon

@mvanlonden correct me if im wrong but tag v2.0.0 has been out for a while ..
And i can see no commit mentioning subscriptions. in commits or in the tags that follow.

Could you provide an example implementation of a subscription.
would one need to use channels in django for this .. ?

@japrogramer it is not documented well/at all but documentation needs are tracked here. This commit added support for subscriptions.

https://github.com/eamigo86/graphene-django-subscriptions is an example of using graphene-django with subscriptions although we'd like to implement the functionality natively in graphene-django

Mind submitting a PR with documentation for subscriptions?

@mvanlonden that has been possible for a while, i thought your announcement was about making the message delivered via graphene.

Not sure this issue should be closed, since the apollo integration still takes some set up.
even with a simple lib.

case and point.

at any rate, here is another successful attempt,
This time i wrote it for aiohttp

from graphql import GraphQLError
from aiograph.redistools.pubsub import subscribe, reader
from aiograph.redistools.utils import create_redis

from graphene.types import generic
import graphene
import asyncio


class TestType(graphene.ObjectType):
    name = "test"
    msg = graphene.String()


class Query(object):
    test_field = graphene.Field(TestType, msg=graphene.String())

    async def resolve_test_field(self, info, **args):
        msg = args.get('msg', None)
        return TestType(msg=msg)


class MessageMutation(graphene.Mutation):
    status = graphene.Int()
    errors = generic.GenericScalar()

    class Arguments:
        channel = graphene.String(required=True)
        msg = graphene.String(required=True)

    @staticmethod
    async def mutate(root, info, **input):
        try:
            redis = await create_redis()
            await redis.publish(
                input.get('channel'),
                input.get('msg'))
            return MessageMutation(status=200, errors=None)
        except Exception as e:
            breakpoint()
            raise GraphQLError('An Error occoured.')

class TestSubscription(object):
    test_updated = graphene.Field(TestType, channel=graphene.String())

    async def resolve_test_updated(root, info, **input):
        loop = asyncio.get_event_loop()
        def next(message):
            return TestType(msg=message)
        redis = await create_redis()
        channel = await subscribe(redis, input.get('channel', 'default'))
        async for i in reader(channel[0]):
            # Here we yield the actual object for the TestType
            yield next(i)

class Mutations(object):
    send_message = MessageMutation.Field()
from contextlib import suppress
from aiohttp_session import get_session
from aiohttp import web, WSMsgType

from graphql.execution.executors.asyncio import AsyncioExecutor
from graphql import graphql

from config.settings import settings
from config.schema import schema

import asyncio
import functools
import json
import rx

from .utils import format_response, run_aiograph

# import the logging library
import logging
logger = logging.getLogger('asyncio')

async def graph_handler(request):
    session = await get_session(request)
    last_visit = session['last_visit'] if 'last_visit' in session else None
    if request.content_type in ('application/graphql', 'application/json'):
        payload = json.loads(await request.text())


    with suppress(NameError):
        message = dict()
        message['request'] = request
        result = await run_aiograph(
            payload,
            executor=AsyncioExecutor(loop=asyncio.get_event_loop()),
            allow_subscriptions=False,
            return_promise=True,
            **{'context_value': message}
            )
        response = format_response(result)
        return web.json_response(response)


class WebSocket(web.View):
    def __init__(self, *args, **kwargs):
        self.loop = asyncio.get_event_loop()
        return super().__init__(*args, **kwargs)

    async def get(self):
        ws = web.WebSocketResponse()
        await ws.prepare(self.request)

        session = await get_session(self.request)
        if 'WebSockets' in self.request.app:
            self.request.app['websockets'].append(ws)
        else:
            self.request.app['websockets'] = ws

        async for msg in ws:
            if msg.type == WSMsgType.text:
                data = json.loads(msg.data)
                if data['type'] == 'close':
                    await ws.close()
                else:
                    if data['type'] == 'connection_init':
                        logger.debug('connection_init')
                        await ws.send_json(
                            { 'type': 'websocket.send',
                              'text': json.dumps({'type': 'connection_ack'})}
                            )
                    if data['type'] == 'start':
                        await self._start(ws, data)
                    if data['type'] == 'stop':
                        await ws.send_str(str(data))
            elif msg.type == WSMsgType.error:
                log.debug(
                    'ws connection closed with exception %s'\
                    % ws.exception())

        if 'WebSockets' in self.request.app:
            self.request.app['websockets'].remove(ws)
        logger.debug('websocket connection closed')
        return ws

    async def _start(self, ws, data):
            message = dict()
            message['request'] = self.request
            result = await run_aiograph(
                data,
                allow_subscriptions=True,
                return_promise=True,
                **{'context_value': message}
                )
            if isinstance(result, rx.Observable):
                logger.debug('result is an Observable')
                result.subscribe(lambda t: self.loop.create_task(self._send_result(ws, t)))
            else:
                response = format_response(result)
                await ws.send_json(response)

    async def _send_result(self, ws, result):
        await ws.send_json(format_response(result))

Since this was a quick test this sufficed.

<!doctype html>

<html lang="en">
<head>
  <meta charset="utf-8">
  <title>Humble begginings</title>
  <meta name="description" content="Test bed">
  <meta name="author" content="me">
</head>

<body>
  <script language="JavaScript">
    socket = new WebSocket("ws://" + window.location.host + "/wsgql");
    socket.onmessage = function(e) {
      alert(JSON.parse(e.data));
    }
    socket.onopen = function(){
      alert('Connection to server started')
    }
    socket.onclose = function(event){
      if(event.wasClean){
        alert('Clean connection end')
      }else{
        alert('Connection broken')
      }
    };
    socket.onerror = function(error){
      alert(error);
    }
    socket.onopen = function(){
      alert('Connection to server started')
      var msg = {
        type: 'start',
        query:
        'subscription { testUpdated(channel:"test"){ msg }}',
      }
      socket.send(JSON.stringify(msg));
    }
    console.log('about to send message.')
  </script>
</body>
</html>

@japrogramer: wondering if it makes sense ditching graphene subscriptions altogether for the sake of apollo client integration if there is no out-of-the box support for it.

Say, most minimal setup: Django users running on top of Postgres could simply run a standalone subscriptions-transport-ws with graphql-postgres-subscriptions, in graphene endpoint mutations we just directly use postgres's pubsub with NOTIFY pinAdded; '{"pinAdded":{"title":"wow",", "id":0}}'

Personally, I wouldn't consider graphql-postgres-subscriptions a suitable replacement for subscription functionality in Graphene. Obviously different people will have different needs, but if I'm building a GraphQL API for a Python project, divorcing my subscriptions backend from the main app introduces some undesirable complications. Unless you're just providing full, open access to your database, it would mean having to re-implement whatever authentication and permissions are involved in accessing your database. If you're using Django, for example, you no longer have access to the authentication backend and would need to re-implement it in Node, at which point why are you bothering with Python/Django/Graphene at all?

@joshourisman: this makes a good point, thanks. (I just had a thought of passing temporary subscription tokens to clients on login for things they can subscribe to, then verifying them upon subscription on subscriptions-transport-ws side)

Basically, I have a legacy (somehow large) django app. I'm investigating into graphene integration on top of that and hooking up apollo client with angular / native iOS / native Android clients with hopes of reducing amount of logic needed to be written to support the apis. And this seemingly build-in subscription thing into Apollo clients across each platform (haven't used any graphql before today pretty much) looks promising for the first glance.

@ambientlight you can implement subscriptions with django-channels and apollo, ive done it, its not trivial. Here is an example implementation that is similar to mine, in the way it delivers the message to the client. https://github.com/datadvance/DjangoChannelsGraphqlWs

Does anyone know when subscriptions will be implemented natively in graphene?

@dspacejs because graphene is very separated from the common python frameworks .. it is difficult to implement a delivery system for all of them in this package.

However i would like to see packages like graphene-django and graphene-flask to implement their own handler/view that works for their platform .. following the language API
closely enough so that it works with apollo. That way we don't have users trying to roll their own implementation every time someone wants subscriptions And all the eyes of the community are in one implementation for the best support.

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

I didn't get, is there a native subscription support in Graphene or not. Shall I drop my https://github.com/datadvance/DjangoChannelsGraphqlWs or not yet?

@prokher please don't drop the only one I could get working 😄

Is there any updates on this subject?

Hi everyone! I just released a GraphQL subscriptions implementation for Graphene + Django based on an internal implementation we’ve been using in production for the last 6 months at Jetpack (https://tryjetpack.com/). It builds on @tricoder42's gist implementation and takes inspiration from the great work that @japrogramer, @eamigo86 and @prokher have done. Since this is the canonical Graphene subscriptions issue I thought I would post it here.

https://github.com/jaydenwindle/graphene-subscriptions

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Any updates? (Trying to keep the discussion alive.) :)

Was this page helpful?
0 / 5 - 0 ratings