Hello @syrusakbary.
Thanks for all your hard work on graphene and graphql-python. Awesome library!!
I posted this on #393 earlier this week...reposting here so it's easier to discover.
I implemented a port of the apollo graphql subscriptions modules (graphql-subscriptions and subscriptions-transport-ws) for graphene / python. They work w/ apollo-client.
It is here.
Same basic api as the Apollo modules. It is still very rough...but works so far, based on my limited internal testing. Uses redis-py, gevent-websockets, and syrusakbary/promises. I was going to add a simple example app, setup.py for easier install, and more info to the readme w/ the API, in the next few days. A brief example is below. Only works on python2 for now. My plan is to start working on tests as well. I figured I'd go ahead and share in this early stage in case anybody is interested...
I'm very new to open source, so any critiques or pull requests are welcome.
Simple example:
Server (using Flask and Flask-Sockets):
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_sockets import Sockets
from .subscription_manager import SubscriptionManager, RedisPubsub
from .subscription_transport_ws import ApolloSubscriptionServer
app = Flask(__name__)
sockets = Sockets(app)
pubsub = RedisPubsub()
schema = graphene.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription
)
subscription_mgr = SubscriptionManager(schema, pubsub)
@sockets.route('/socket')
def socket_channel(websocket):
subscription_server = ApolloSubscriptionServer(subscription_mgr, websocket)
subscription_server.handle()
return []
if __name__ == "__main__":
from geventwebsocket import WebSocketServer
server = WebSocketServer(('', 5000), app)
print ' Serving at host 0.0.0.0:5000...\n'
server.serve_forever()
Of course on the server you have to "publish" each time you have a mutation (in this case to a redis channel). That could look something like this (using graphene / sql-alchemy):
class Subscription(graphene.ObjectType):
users = graphene_sqlalchemy.SQLAlchemyConnectionField(
User,
active=graphene.Boolean()
)
def resolve_users(self, args, context, info):
query = User.get_query(context)
return query.filter_by(id=info.root_value.get('id'))
class AddUser(graphene.ClientIDMutation):
class Input:
username = graphene.String(required=True)
email = graphene.String()
ok = graphene.Boolean()
user = graphene.Field(lambda: User)
@classmethod
def mutate_and_get_payload(cls, args, context, info):
_input = args.copy()
del _input['clientMutationId']
new_user = UserModel(**_input)
db.session.add(new_user)
db.session.commit()
ok = True
if pubsub.subscriptions:
pubsub.publish('users', new_user.as_dict())
return AddUser(ok=ok, user=new_user)
Client (using react-apollo client):
import React from 'react'
import ReactDOM from 'react-dom'
import { graphql, ApolloProvider } from 'react-apollo'
import gql from 'graphql-tag'
import ApolloClient, { createNetworkInterface } from 'apollo-client'
import { SubscriptionClient, addGraphQLSubscriptions } from 'subscriptions-transport-ws'
import ChatApp from './screens/ChatApp'
import ListBox from '../components/ListBox'
const SUBSCRIPTION_QUERY = gql`
subscription newUsers {
users(active: true) {
edges {
node {
id
username
}
}
}
}
`
const LIST_BOX_QUERY = gql`
query AllUsers {
users(active: true) {
edges {
node {
id
username
}
}
}
}
`
class ChatListBox extends React.Component {
componentWillReceiveProps(newProps) {
if (!newProps.data.loading) {
if (this.subscription) {
return
}
this.subscription = newProps.data.subscribeToMore({
document: SUBSCRIPTION_QUERY,
updateQuery: (previousResult, {subscriptionData}) => {
const newUser = subscriptionData.data.users.edges
const newResult = {
users: {
edges: [
...previousResult.users.edges,
...newUser
]
}
}
return newResult
},
onError: (err) => console.error(err)
})
}
}
render() {
return <ListBox data={this.props.data} />
}
}
const ChatListBoxWithData = graphql(LIST_BOX_QUERY)(ChatListBox)
export default ChatListBoxWithData
const networkInterface = createNetworkInterface({
uri: 'http://localhost:5000/graphql'
})
const wsClient = new SubscriptionClient(`ws://localhost:5000/socket`, {
reconnect: true
})
const networkInterfaceWithSubscriptions = addGraphQLSubscriptions(
networkInterface,
wsClient,
)
const client = new ApolloClient({
dataIdFromObject: o => o.id,
networkInterface: networkInterfaceWithSubscriptions
})
ReactDOM.render(
<ApolloProvider client={client}>
<ChatApp />
</ApolloProvider>,
document.getElementById('root')
)
This is awesome!
I will take a closer look next week and provide some feedback then :)
@syrusakbary Thoughts?
Nice!!!
@syrusakbary Any update on this?
I took a look in the implementation and besides some small nits (like Apollo* naming) it looked good!
I'm waiting to few things:
Thanks for the comments @syrusakbary . I think your grapqhl / graphene / promises libraries are amazing...any constructive criticism you have I'm happy to hear. I only used the "Apollo.." naming convention because my implementation was initially based on their (Apollo's) graphql subscription transport protocol. But, as the final spec should be merged soon, I'm happy to drop that convention, since it only affects the main subscription transport class. I've been tied up the last couple months, since I published this, so I haven't been able to devote much time to improving it. Some priorities in the near term for me:
Thanks!
@hballard @syrusakbary With the official spec now merged, is there room for contributions on getting subscriptions implemented? I'm interested in using graphene with a new django project, and this would be huge.
Indeed. This would make Django a great fit for angular2 real time apps
@Helw150 - I can't speak for @syrusakbary, but I know I'd welcome any contributions on my repo. Also, not sure if @syrusakbary is interested in integrating it into graphene eventually, just prefers to fork it, or go his own way. I started this mainly as a hobby project, when I was playing w/ Apollo subscriptions and noticed their wasn't an implementation for graphene (python being my preferred server language). I just pushed a commit to the "tests" branch with about half the subscriptions-transport tests and all the subscriptions-manager tests were added a few weeks ago. The rest of the transport tests should be easier to finish up now, the initial test setup for those slowed me down a bit--some of it was new for me. I haven't had a ton of time to devote to this and I don't really use django...so not sure when I would get to that. My next focus would be Python 3 compatibility...which might not be that difficult. Of course now that the initial commits for graphql subscriptions have been added to the spec, probably lot's more need to be done outside of these focues. You can read my summary of the my transport tests commit on the issue I created for them here
Update: Initial tests have been added as of last weekend (see commit here and I merged a commit last week that added Python 3 compatibility (2.7, 3.4, 3.5, & 3.6). My next two priorities are adding additional executors (threads, asyncio, etc.) and some type of Django compatibility. I don't really use Django...for those that do...is channels the preferred method for adding real-time services to Django now or something like django-websocket-redis or django-socketio? I've been doing a little reading on Django and channels...
Great work @hballard. I'd like to use this as inspiration for an example in my ReactQL starter kit to show how subscriptions can be served from a non-Node.js server.
@leebenson - Very cool. Any feedback you can provide is appreciated. Let me know if I can be of assistance.
Per @syrusakbary previous comment above, "Having a subscriptions integration with Django so we assure that the subscriptions structure is abstracted in a scalable way..."; I've been thinking about the best way to do that.
My thought is that it should be fairly straightforward to generalize the concurrency executor methods like @syrusakbary did in graphql-core and have separate executor classes for each concurrency library (probably will even borrow some of the logic in graphql-core for each one). Then the RedisPubsub and SubscriptionTransport classes would utilize the corresponding executor passed in by the user when they instantiate each class. I'd welcome any feedback anyone has (@syrusakbary or others) on this structure.
I spent a couple hours reading through the Django channels library this weekend and it would seem it could be treated as just another executor class under this model. Also, anyone familiar w/ Django...seems like I would utilize a "class-based consumer" (inherit from WebsocketConsumer) for the SubscriptionServer. The redis pubsub "wait_and_get_message" method in the RedisPubsub class could be implemented as another consumer. Thoughts on this (from anyone more familiar w/ Django channels)?
How about getting someone from the django core team to assist?
I think reactive programming is something that would bring new people to django which would benefit in the long run. It was the reason for me to look into other solutions.
What do you think?
I'm happy to have any assistance from another contributor--particularly a django core. I haven't reached out to them since I don't utilize django and I needed to abstract the concurrency executor from the base subscriptions manager and websocket server logic, in order to use other concurrency frameworks (like django channels). It would be fairly straightforward to integrate the current gevent version with django-socketio or create a simple package similar to flask-sockets to integrate geventwebsocket into django directly. I found a small library on bitbucket that seems to do just that -- django-gevent-websocket (here).
I'm currently working on abstracting the currency executor to be able to use asyncio for concurrency as well (vs the current gevent). I should merge a version of that with master in the next week or so (which will allow use w/ Sanic web framework using uvloop / asyncio)...and then I was going to turn my attention to django integration, using the same abstraction base. But my plan was to focus more on django-channels...since that seems to be the way forward for django concurrency.
Thank you for the update @hballard that sounds really awesome. :D
I am using django in some projects and would love to replace drf (django-rest-framework) with what you are building. I really prefer reactive programming and I think a graphQL api with subscriptions would totally add a lot of value on top of the django project.
I am not a core django person nor do I have experience with django-channels.. I do however have practical experience with django. Feel free to reach out me if it helps. ;)
@hballard will you be at EuroPython? maybe we can find someone to help during the sprints! :)
Afraid not...I live in Texas (US)...and that would be a bit of hike!
Did you get to give it a try yet? :)
@Eraldo @hballard Yes, I think I should post an update here as I'm working full on subscriptions now.
Some thoughts about my journey: the way Apollo-Subscriptions use to manage subscriptions was not very friendly for the developer, needing to hack around the resolution and a specific PubSub implementation that was "bypassing" the GraphQL engine for adapting it into subscriptions.
The reason for that is the GraphQL-js engine was not ready for subscriptions (meaning that was only able to return either a promise or a static value, but not a async iterator).
However GraphQL-js recently added a way to subscribing to a GraphQL query (that return an async iterator a.k.a. Observable) that pushed towards simpler and cleaner implementations of subscriptions that decouple the subscription resolution from the "listener" on the subscription.
That led to better implementations of the transport mechanisms in GraphQL subscriptions like subscriptions-transport-ws.
So, in summary, subscriptions is something that should be bundled fully into the GraphQL engine, in a way that is easy to plug any mechanisms, such as:
That don't require any specific pub/sub implementation and, eventually, let this decision to the developer (in case it want to use it).
For the next version of Graphene, 2.0 I plan to have subscriptions bundled into the Engine :)
There is already a branch in graphql-core where I'm doing the research process.
I will keep updating this thread with more information as I keep working on it.
@syrusakbary what ended being the recommended approach on this? I'm using graphql python and loving it, and I'm working on an iot device project where they use mqtt for data observations. Curious if there is a recommended approach or if it is 'do what thou wilt' for subscriptions in graphql python.
Thanks!
can we have an example of how subscriptions works with Graphene in Django?
@AgentChris
Here is how i managed it https://github.com/graphql-python/graphene/pull/500#issuecomment-325560994
also look at https://github.com/graphql-python/graphql-core/issues/149
thanks a los, i was trying to find an example for the last 2 days
Hi @AgentChris, take a look at this module, maybe you might be interested:
graphene-django-subscriptions
I would like to know what is the progress in the implementation of subscriptions into the core of Graphene, could someone clarify please?
@Oxyrus it seems that the mechanism has been decided to be rx observables but the method for delivery to the client is still up to you.
Here's a gist with my solution using django-channels. It's a working proof of concept. Next task is optimize it for production environment. Feedback welcome!
@tricoder42 interesting code layout, I was using observables to emit subscriptions but I was having problems making django pretend to be async .. Your aproach seems cleaner.
To avoid repeating queries you could use promises and dataloaders to group as much repetitive work to just one computation.
@japrogramer What do you mean by dataloaders to group?
The main problem is that workers run in different processes, so I need to run query at least once per process. Second problem is that workers are intended to be short-lived, so I can't take it granted that parsed query is available. Worker can be restarted anytime.
@tricoder42 So Is there hope of getting this into graphene or not? Should it be spun off into its own library?
The only part which would fit into graphene is Subscription class. Everything else depends on backend (django-channels or redis), so it might be better to either keep it in separate package or locally in project. Anyway, django-channels 2.0 are on the way which makes imlementation a bit cleaner.
@tricoder42 So Is there hope of getting this into graphene or not? Should it be spun off into its own library?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
@tricoder42 how does django-channels 2.0 make the implementation cleaner?
@japrogramer I havent' tried it yet, but it should be possible to register custom callbacks to a Group.
Right now it's only possible to add another Channel to a Group and when message is sent, it's automatically broadcasted to all channels in Group. This is enough if you know the shape of data before sending a message. E.g if you have a Serializer (like in REST API), you can listen post_save signal, serialize new instance and broadcast it to the Group.
In GraphQL, however, we don't know what shape of data user requested, so we need to serialize instance for each Channel (subscriber) in a group separately. That's why I'm having two layers of notifications - one for model changes and second for graphql subscriptions. In channels 2.0 it should be possible to register a callback which feeds data to Observable and returns serialized data.
After first real-world implementation and testing I updated the gist with recent code changes:
Still using django-channels 1.x.
Any examples of Graphql/Graphene using subscriptions in a Flask App? Hopefully something comparable with Apollo suite ?
@tricoder42 I just did some experimenting with Channels 2.0 this is what I have ..
Im still digging into the Docs. I also don't have much time this days but Ill update once I get something more substantial going.
Note: line 62, probably needs to be made sync since I don't think on_next can be async
1 from django.utils.translation import get_language
1 import json
2 # import functools
3 import asyncio
4 import rx
5
6 from channels.consumer import AsyncConsumer
7
8 from graphene_django.settings import graphene_settings as gqsettings
9 from .views import DataLoaders
10
11
12 schema = gqsettings.SCHEMA
13
14
15 class GQLConsumer(AsyncConsumer):
16 # NOTE: asgiref.SyncToAsync for django ORM
17
18 async def graphsend(self, opID, result, message):
19 data = result.data
20 await self.send(
21 {
22 'type': 'websocket.send',
23 'text': str(json.dumps({'data': data, 'type': 'data', 'id': opID}))
24 })
25
26 # @allowed_hosts_only
27 async def websocket_connect(self, event):
28 # message.reply_channel.send({'accept': True, 'text': json.dumps({'type': 'connection_ack'})})
29 # TODO: This might need some security, auth users or apps only <10-11-17> #
30 await self.send({
31 "type": "websocket.accept",
32 })
33
34 async def websocket_receive(self, message):
35 # message is gone from the call signature, need to inspect the content of text_data and bytes_data
36 clean = json.loads(message['text'])
37 gqtype = clean.get('type')
38 clean = clean.get('payload')
39
40 if gqtype == 'connection_init':
41 await self.send({'type': 'websocket.send', 'text': json.dumps({'type': 'connection_ack'})})
42 elif gqtype == 'start':
43 __import__('pdb').set_trace()
44 self.operationName = clean.get('operationName')
45 self.query = clean.get('query')
46 self.foovar = clean.get('variables')
47
48 # This part acts like a request
49 message = dict()
50 message['reply_channel'] = self.channel_name
51 message['scope'] = self.scope
52 message['dataloaders'] = DataLoaders(get_language())
53 self.kwargs = {'context_value': message}
54
55 # TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #
56 # TODO: Implement timeout mechanism <10-11-17> #
57 result = schema.execute(self.query, variable_values=self.foovar, allow_subscriptions=True, **self.kwargs)
58 if isinstance(result, rx.Observable):
59 class MyObserver(rx.Observer):
60
61 def on_next(self, x):
62 self.graphsend(self.operationName, x, message)
63
64 def on_error(self, e):
65 ...
66
67 def on_completed(self):
68 ...
69
70 result = result.publish().auto_connect()
71 result.subscribe(MyObserver())
72 elif gqtype == 'stop':
73 operationName = clean.get('operationName')
74 await self.channel_layer.group_discard(operationName, self.channel_name)
75 else:
76 await self.send({'type': 'websocket.send', 'text': json.dumps({'data': 'connection_ack'})})
77
78 async def websocket_disconnect(self):
79 if 'Groups' in self.scope['session']:
80 for x in self.scope['session']['Groups'].split(','):
81 ...
82 # for every Group in the session, unsubscribe current connection
83 await self.channel_layer.group_discard(x, self.channel_name)
84 # finally del the Groups from the session
85 del self.scope['session']['Groups']
~
@kavnik
Not with flask, but I did build something with Sanic which is flask like if you are interested I can send it to you.
See here: https://github.com/graphql-python/graphene/issues/545
Adam Hopkins
On Feb 7, 2018, 4:13 AM +0200, kavink notifications@github.com, wrote:
Any examples of Graphql/Graphene using subscriptions in a Flask App? Hopefully something comparable with Apollo suite ?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub, or mute the thread.
also super interested in anything with flask or flask-like
@ahopkins Please can you post the example someplace. Would love to look at it and maybe port it to flask.
@kavink Here is a gist: https://gist.github.com/ahopkins/52bcd7d15de1e0356ee22f82b6cbf9c8
@ahopkins Thanks ! How does one publish to feed and subscribe from server side ? i.e. how can I modify for functions to either send data into feed to client or listen to client from feed.
@kavink graphene is intended to use RXPY for that part, I'm currently trying to work out how to use that with channels so i don't know that much about it but hopefully it leads you down the right road.
@tricoder42 Thank you for your promising code!
Is it possible to use Group instead of GraphQLSubscriptionStore to store queries (as sending to multiple channels in a group is optimized)?
For example:
# subscribed_groups = {'User': set(), ...}
# subscription_string = transform('subscription mysub on UserNode { ... }');
def on_message(message):
global subscribed_groups;
...
subscription_string = 'sub-%s' % hash(message.content['payload'])
Group(subscription_string).add(reply_channel);
model_name = message.content['model']
subscribed_groups[model_name].add(subscription_string);
...
@receiver(post_save, sender=model)
def send_model_update(...):
global subscribed_groups;
for group_name in subscribed_groups[model.__name__]:
Group(group_name).send(...)
@heyrict Unfortunately you can't in general case, because each client might be subscribed for different data. In other words: two clients might use the same subscription, but still fetch different data. I'm using custom redis store, because I need to keep graphql query around. In django-channels 1x, workers and interface are different processes so I can't data using globals either.
However, in django-channels 2.x this is solved differently, I'm gonna try it soon.
@japrogramer @ahopkins Thanks for gist and the pointer to RXPY, I will certainly look at it. But what im trying to understand how would it all work , Graphene/RXPY and the gist(Graphql subscriptions), basically trying to wrap my head around on it. i.e. should I use RXPY to publish and read from feed ? But I dont see a way to create channels in RXPY, like architecture wise what all components tie together. I can then try to reverse and implement something working .
I just submitted a pull request to graphql-ws that gives an example of a publish - subscription implementation, to make it easier to use subscriptions with graphql-ws. I modified the README and examples to show how it might work. Not sure if @syrusakbary would want this as a part of the graphql-ws library or in a separate one. Here is my fork in case you want to try it out.
@japrogramer How do you broadcast changes in DB? Do you use django signals?
I've updated my project to django-channels 2.x and also the gist.
TL;DR:
django.{app_label}.{model} group.django.{app_label}.{model} (this defaults to subscription's output_type, but might be overridden in Subscription.subscribe method)(pk, model_label) tuple as a root_value. Subscription.next receives this tuple as first parameter and return None or corresponding object from db.I wish I could parse query just a once and pass a coroutine to Observable. At initialization, the coroutine would subscribe to django.{app_label}.{model}, and then I would simply call coroutine.send((pk, model)) and Observer would send serialized data to client. However it seems that whenever I pass iterable to observable, it always tries to consume it whole, even when it should wait for new data. I'm kinda lost here, but it's just an optimization. The implementation works as it is, now I'm just struggling with unit tests.
Any feedback welcome,
cheers!
@tricoder42 you could try using rx.Observable.from_iterable also take a look at this page for examples, here is a direct link to a relevant example: https://github.com/thomasnield/oreilly_reactive_python_for_data/blob/master/class_notes/class_notes.md#44---an-observable-emitting-tweets
Im still trying to wrap my head around observables. I like how if the resolve method for the subscription receives an object instance it publishes a new result.
@japrogramer Thank you for the link! I finally got it, see updated gist.
Key points:
Subscription.next, which returns instance to be sent as a new resultThe pipeline is following:
model_changed - notification is received on model_changed channelStreamObservable.send - calling stream.send pushes new data down the streamSubscription.next - (pk, model) tuple is resolved in model instanceGraphQL Executor - all results from observable are serialized using GraphQL executorGraphqlSubcriptionConsumer._send_result - finally, new data are sent to client@tricoder42 what query are you using with your code? doesn't the model get read from the database for each subscriber?
Did you mean to pass an observer to your stream initiation here L71 ?
@japrogramer Yeah, each subscribers needs to load the instance from DB. It may be optimised for single worker, but once you start scaling up, each subscriber might be handled by different process and you can only pass serializable objects through channels.
StreamObservable could be actually replaced with rx.Subject which does the same.
Example query is:
subscription AlertsSubscription {
alerts {
id,
code,
date,
read
}
}
@tricoder42 how about using dataloaders from promises to fetch the instance so that the db is only hit once per process?
Dear @syrusakbary on July 30 you made a great comment about the place subscriptions implementation must be, you also told that you start the research in the branch. I just wanted to ask for an update. Did you succeed in your research? Shall we expect subscriptions to be the part of Graphene library eventually? I see there are couple implementations available (e.g. GraphQL Subscription with django-channels, graphene-django-subscriptions), but none of them can be used in production out of the box (by different reasons).
@prokher actually I'm using @tricoder42 's implementation for graphql subscription in production (with minor modifications as I'm using python3.5 and redis backend) , and it turns out working perfectly (and much cleaner than graphene-django-subscriptions).
Making bridges between two packages may be time consuming, but if you want to use subscription right now, I recommend you to simply migrate the code to your project, as it won't be much work.
@heyrict Thank you very much, we came to the same conclusion. The code in the gist indeed very simple and clean. That is definitely way to go. Manu thanks to @tricoder42.
BTW, just for information: the protocol used by the subscriptions-transport-ws (WebSocket-based transport used by Apollo subscriptions) is described here.
What do you think about not sending useless data back to the client?
Actually, I am having lots of subscription objects with {'subscriptionName': None}.
def _send_result(self, id, result):
# Don't send results if no useful data is generated
errors = result.errors
if not errors:
if not isinstance(result.data, dict):
return
if sum(map(lambda x: x != None, result.data.values())) == 0:
return
self.send({
'type':
'websocket.send',
'text':
json.dumps({
'id': id,
'type': 'data',
'payload': {
'data': result.data,
'errors': list(map(str, errors)) if errors else None,
}
})
})
hey quick follow up, has anyone managed to make an asyncronous consumer for subscriptions ?
currently im using concurrent threads as the executor but im switch over to
from graphql.execution.executors.asyncio import AsyncioExecutor
Im mocking out some code .. here is what i have so far
from django.utils.translation import get_language
from asgiref.sync import SyncToAsync
import json
import functools
import asyncio
import concurrent.futures
import rx
from rx.subjects import Subject
from rx.concurrency import AsyncIOScheduler
from channels.consumer import AsyncConsumer
from channels.exceptions import StopConsumer
from graphene_django.settings import graphene_settings as gqsettings
from .views import DataLoaders
schema = gqsettings.SCHEMA
class GQLConsumer(AsyncConsumer):
# NOTE: asgiref.SyncToAsync for django ORM
def __init__(self, scope):
super().__init__(scope)
# keeps a record of streams
self.subscriptions = {}
# keeps a record of groups the connection belongs to
self.groups = {}
# scheduler
# self.scheduler = AsyncIOScheduler()
# @allowed_hosts_only
async def websocket_connect(self, event):
# message.reply_channel.send({'accept': True, 'text': json.dumps({'type': 'connection_ack'})})
# TODO: This might need some security, auth users or apps only <10-11-17> #
await self.send({
"type": "websocket.accept",
"subprotocol": "graphql-ws",
})
async def websocket_receive(self, message):
# message is gone from the call signature, need to inspect the content of text_data and bytes_data
request = json.loads(message['text'])
if request['type'] == 'connection_init':
await self.send(
{
'type': 'websocket.send',
'text': json.dumps({'type': 'connection_ack'})
})
elif request['type'] == 'start':
payload = request.get('payload')
id = request.get('id')
# This part acts like a request, QUESTION: should this be a dict like object that set/get from the self.scope?
message = dict()
message['id'] = id
message['reply_channel'] = self.channel_name
message['scope'] = self.scope
message['groups'] = self.groups
message['dataloaders'] = DataLoaders(get_language())
stream = Subject()
# TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #
# TODO: Implement timeout mechanism <10-11-17> #
# result = await SyncToAsync(schema.execute)(self.query, variable_values=self.foovar, allow_subscriptions=True, **self.kwargs)
result = asyncio.wait_for(
schema.execute(
payload['query'],
operation_name=request['operationName'],
variable_values=payload['variables'],
executor=concurrent.futures.ThreadPoolExecutor(),
root_value=Observable.create(stream).share(),
allow_subscriptions=True,
**{'context_value': message})
)
if isinstance(result, rx.Observable):
result = result.publish().auto_connect()
result.subscribe(functools.partial(self._send_result, id))
self.subscriptions[id] = stream
else:
self._send_result(id, result)
elif request['type'] == 'stop':
operationName = request.get('operationName')
await self.channel_layer.group_discard(operationName, self.channel_name)
async def websocket_disconnect(self):
for group in self.groups.keys():
await self.channel_layer.group_discard(group, self.channel_name)
await self.send({
"type": "websocket.close", "code": 1000
})
raise StopConsumer()
async def _send_result(self, id, result):
errors = result.errors
await self.send({
'type': 'websocket.send',
'text': json.dumps({
'id': id,
'type': 'data',
'payload': {
'data': result.data,
'errors': list(map(str, errors)) if errors else None,
}
})
})
def model_changed(self, message):
forwhat = message['models']
...
ok Im a bit closer to having an async consumer.. however there is one small issue.
Let me go into detail before you get to the code. currently I am attempting to use an async executor to run the schema .. that however causes the result not to return an observable but a
the errors property returns this
[RuntimeError('This event loop is already running',)]
so I would expect that the schema not to execute since these error should mark the end of the execution and nothing should be resolve .. however some strange behaviour occours ..
I set a brake point inside of the resolve method for the subscription and after the schema executes and is returned that brake point is hit
> /app/apple/graphquery/consumers.py(74)websocket_receive()
-> result = schema.execute(
(Pdb) c
> /app/apple/graphquery/consumers.py(84)websocket_receive()
-> if isinstance(result, rx.Observable):
(Pdb) result
<graphql.execution.base.ExecutionResult object at 0x7f901c35f488>
(Pdb) result.errors
[RuntimeError('This event loop is already running',)]
(Pdb) c
> /app/apple/product/schema.py(179)resolve_sub_product()
-> await make_sub(info, input.get('product'))
(Pdb) ll
177 async def resolve_sub_product(self, info, **input):
178 __import__('pdb').set_trace()
179 -> await make_sub(info, input.get('product'))
180 name = ProductType._meta.model.__class__.__name__
181
182 stream = info.root_value
183 return stream.map(lambda message: self.next(message, info, **input))
(Pdb)
here is what the updated consumer looks like
from django.utils.translation import get_language
from asgiref.sync import SyncToAsync
from rx.subjects import Subject
from rx.concurrency import AsyncIOScheduler
from channels.consumer import AsyncConsumer
from channels.exceptions import StopConsumer
from graphql.execution.executors.asyncio import AsyncioExecutor
from graphql import graphql
from graphene_django.settings import graphene_settings as gqsettings
from .views import DataLoaders
import rx
import json
import json
import functools
import asyncio
import functools
import asyncio
schema = gqsettings.SCHEMA
class GQLConsumer(AsyncConsumer):
# NOTE: asgiref.SyncToAsync for django ORM
def __init__(self, scope):
super().__init__(scope)
# keeps a record of streams
self.subscriptions = {}
# keeps a record of groups the connection belongs to
self.groups = {}
# self.executor = AsyncioExecutor(loop=asyncio.get_event_loop())
# @allowed_hosts_only
async def websocket_connect(self, event):
# message.reply_channel.send({'accept': True, 'text': json.dumps({'type': 'connection_ack'})})
# TODO: This might need some security, auth users or apps only <10-11-17> #
await self.send({
"type": "websocket.accept",
"subprotocol": "graphql-ws",
})
async def websocket_receive(self, message):
# message is gone from the call signature, need to inspect the content of text_data and bytes_data
request = json.loads(message['text'])
if request['type'] == 'connection_init':
await self.send(
{
'type': 'websocket.send',
'text': json.dumps({'type': 'connection_ack'})
})
elif request['type'] == 'start':
payload = request.get('payload')
id = request.get('id')
# This part acts like a request, QUESTION: should this be a dict like object that set/get from the self.scope?
message = dict()
message['id'] = id
message['reply_channel'] = self.channel_name
message['scope'] = self.scope
message['subscribe'] = functools.partial(self._subscribe, id)
message['dataloaders'] = DataLoaders(get_language())
stream = Subject()
# TODO: Implement weight, can this query run for this user or is it too expensive <10-11-17> #
# TODO: Implement timeout mechanism <10-11-17> #
# result = await SyncToAsync(schema.execute)(self.query, variable_values=self.foovar, allow_subscriptions=True, **self.kwargs)
__import__('pdb').set_trace()
result = schema.execute(
payload['query'],
variable_values=payload['variables'],
root_value=rx.Observable.create(stream).share(),
allow_subscriptions=True,
executor=AsyncioExecutor(loop=asyncio.get_event_loop()),
**{'context_value': message})
isinstance(result, rx.Observable)
__import__('pdb').set_trace()
if isinstance(result, rx.Observable):
result = result.publish().auto_connect()
result.subscribe(functools.partial(self._send_result, id))
self.subscriptions[id] = stream
else:
self._send_result(id, result)
elif request['type'] == 'stop':
await _unsubscribe(request)
async def websocket_disconnect(self, *args, **kwargs):
for group in self.groups.keys():
await self.channel_layer.group_discard(group, self.channel_name)
await self.send({
"type": "websocket.close", "code": 1000
})
raise StopConsumer()
def _subscribe(self, id, gp_name):
group = self.groups.setdefault(gp_name, set())
self.groups[gp_name].add(id)
async def _unsubscribe(self, request):
operationName = request.get('operationName')
await self.channel_layer.group_discard(operationName, self.channel_name)
id = request.get('id')
del self.subscriptions[id]
async def _send_result(self, id, result):
errors = result.errors
await self.send({
'type': 'websocket.send',
'text': json.dumps({
'id': id,
'type': 'data',
'payload': {
'data': result.data,
'errors': list(map(str, errors)) if errors else None,
}
})
})
async def model_changed(self, message):
__import__('pdb').set_trace()
gp_name = message['gp_name']
pk = message['pk']
for id in self.groups.get(gp_name, []):
stream = self.subscriptions.get(id)
if not stream:
continue
stream.on_next((pk, model))
The misterous async subscription that executes after it fails to execute ...
class ProductSubscritption(object):
"""test"""
sub_product = graphene.Field(
ProductType,
description='subscribe to updated product',
product=graphene.ID())
async def resolve_sub_product(self, info, **input):
__import__('pdb').set_trace()
await make_sub(info, input.get('product'))
name = ProductType._meta.model.__class__.__name__
stream = info.root_value
return stream.map(lambda message: self.next(message, info, **input))
@classmethod
def next(cls, message, info, **input):
# here the message comes from the stream but the info and **input come frome
# subscribing to the next method to the stream
inst = relay.Node.get_node_from_global_id(info, input.get('product'))
return inst
and the signal register, this doesn't seem to be related to the issue im having but just in case you would like to look at it
from asgiref.sync import AsyncToSync
from channels.layers import get_channel_layer
from graphene import relay
from graphql_relay import from_global_id as fgi # , to_global_id
from promise.dataloader import DataLoader
from promise import Promise
import json
channel_layer = get_channel_layer()
group_send = AsyncToSync(channel_layer.group_send)
def send_update(sender, instance, created, attr='pk', *args, **kwargs):
value = str(getattr(instance, attr))
payload = {
'type': 'model.changed',
'pk': instance.pk,
'attr': value,
'created': created,
}
# if the instance was created, send to channels listening for created
if created:
group_send(
'gqp.{0}-add'.format(str.lower(instance.__class__.__name__)),
payload
)
return
# pk is prefered if available over the provided attr
if hasattr(instance, 'pk'):
name = getattr(instance, 'pk')
else:
name = value
gp_name = 'gqp.{0}-updated.{1}'.format(str.lower(instance.__class__.__name__), name)
payload['gp_name'] = gp_name
# If the item was updated, send signal to channels listening for updates
group_send(
gp_name,
payload
)
async def make_sub(info, gid):
inst = relay.Node.get_node_from_global_id(info, gid)
try:
gp_name = 'gqp.{0}-updated.{1}'.format(str.lower(inst.__class__.__name__), inst.pk)
# Group(gp_name).add(info.context.reply_channel)
# info.context.channel_session['Groups'] = ','.join( (gp_name, info.context.channel_session['Groups']))
await channel_layer.group_add(
gp_name,
info.context['reply_channel']
)
subscribe = info.context['subscribe']
if subscribe:
subscribe(gp_name)
except:
pass
here im going to tag some people,
@tricoder42 @prokher @hballard @syrusakbary
getting closer : https://github.com/graphql-python/graphql-core/issues/63#issuecomment-396017113
I figured it out,
just needed this line to get async observable working
result.subscribe(lambda t: loop.create_task(self._send_result( id, t)))
currently using a work around for this issue https://github.com/graphql-python/graphene-django/issues/452
This is the work around, instead of using subscriptions directly .. I use them to trigger regular queries
subscribeToNewMessages() {
this.subscription = this.props.data.subscribeToMore({
document: gql`
subscription ChangeInProduct($id: ID!) {
subProduct(product: $id) {
title
}
}
`,
variables: { id: this.props.match.params ? this.props.match.params.id : null },
updateQuery: (prev, { subscriptionData }) => {
if (!subscriptionData.data) {
return prev;
}
console.log('interesting', prev, subscriptionData);
this.props.data.refetch()
return prev;
},
});
}
JFYI: We have eventually published our domestic subscriptions implementation DjangoChannelsGraphqlWs. Actually we implemented a simple WebSocket-based GraphQL server implemented on Django Channels. Subscriptions are implemented in the Graphene-like style. The work is still in progress, but probably someone will find it useful.
Ahhh! i have been waiting for this. @prokher i will definitely check it out.
@Musbell be careful, it is not ready for production yet, currently we are working hard to improve (add asynchronous message handling, ordering/serialization, examples, etc), and we are happy to receive any feedback you have.
@prokher thanks for the notice. :+1:
Subscriptions should be in Graphene2.0+ @syrusakbary mentioned it in this tweet https://twitter.com/syrusakbary/status/923325568157859840?lang=en
I can't find the function in the repo or any documentation, let's hope we get an update soon
Subscriptions introduced in v2! https://github.com/graphql-python/graphene/releases/tag/v2.0.0
@mvanlonden correct me if im wrong but tag v2.0.0 has been out for a while ..
And i can see no commit mentioning subscriptions. in commits or in the tags that follow.
Could you provide an example implementation of a subscription.
would one need to use channels in django for this .. ?
@japrogramer it is not documented well/at all but documentation needs are tracked here. This commit added support for subscriptions.
https://github.com/eamigo86/graphene-django-subscriptions is an example of using graphene-django with subscriptions although we'd like to implement the functionality natively in graphene-django
Mind submitting a PR with documentation for subscriptions?
@mvanlonden that has been possible for a while, i thought your announcement was about making the message delivered via graphene.
Not sure this issue should be closed, since the apollo integration still takes some set up.
even with a simple lib.
case and point.
at any rate, here is another successful attempt,
This time i wrote it for aiohttp
from graphql import GraphQLError
from aiograph.redistools.pubsub import subscribe, reader
from aiograph.redistools.utils import create_redis
from graphene.types import generic
import graphene
import asyncio
class TestType(graphene.ObjectType):
name = "test"
msg = graphene.String()
class Query(object):
test_field = graphene.Field(TestType, msg=graphene.String())
async def resolve_test_field(self, info, **args):
msg = args.get('msg', None)
return TestType(msg=msg)
class MessageMutation(graphene.Mutation):
status = graphene.Int()
errors = generic.GenericScalar()
class Arguments:
channel = graphene.String(required=True)
msg = graphene.String(required=True)
@staticmethod
async def mutate(root, info, **input):
try:
redis = await create_redis()
await redis.publish(
input.get('channel'),
input.get('msg'))
return MessageMutation(status=200, errors=None)
except Exception as e:
breakpoint()
raise GraphQLError('An Error occoured.')
class TestSubscription(object):
test_updated = graphene.Field(TestType, channel=graphene.String())
async def resolve_test_updated(root, info, **input):
loop = asyncio.get_event_loop()
def next(message):
return TestType(msg=message)
redis = await create_redis()
channel = await subscribe(redis, input.get('channel', 'default'))
async for i in reader(channel[0]):
# Here we yield the actual object for the TestType
yield next(i)
class Mutations(object):
send_message = MessageMutation.Field()
from contextlib import suppress
from aiohttp_session import get_session
from aiohttp import web, WSMsgType
from graphql.execution.executors.asyncio import AsyncioExecutor
from graphql import graphql
from config.settings import settings
from config.schema import schema
import asyncio
import functools
import json
import rx
from .utils import format_response, run_aiograph
# import the logging library
import logging
logger = logging.getLogger('asyncio')
async def graph_handler(request):
session = await get_session(request)
last_visit = session['last_visit'] if 'last_visit' in session else None
if request.content_type in ('application/graphql', 'application/json'):
payload = json.loads(await request.text())
with suppress(NameError):
message = dict()
message['request'] = request
result = await run_aiograph(
payload,
executor=AsyncioExecutor(loop=asyncio.get_event_loop()),
allow_subscriptions=False,
return_promise=True,
**{'context_value': message}
)
response = format_response(result)
return web.json_response(response)
class WebSocket(web.View):
def __init__(self, *args, **kwargs):
self.loop = asyncio.get_event_loop()
return super().__init__(*args, **kwargs)
async def get(self):
ws = web.WebSocketResponse()
await ws.prepare(self.request)
session = await get_session(self.request)
if 'WebSockets' in self.request.app:
self.request.app['websockets'].append(ws)
else:
self.request.app['websockets'] = ws
async for msg in ws:
if msg.type == WSMsgType.text:
data = json.loads(msg.data)
if data['type'] == 'close':
await ws.close()
else:
if data['type'] == 'connection_init':
logger.debug('connection_init')
await ws.send_json(
{ 'type': 'websocket.send',
'text': json.dumps({'type': 'connection_ack'})}
)
if data['type'] == 'start':
await self._start(ws, data)
if data['type'] == 'stop':
await ws.send_str(str(data))
elif msg.type == WSMsgType.error:
log.debug(
'ws connection closed with exception %s'\
% ws.exception())
if 'WebSockets' in self.request.app:
self.request.app['websockets'].remove(ws)
logger.debug('websocket connection closed')
return ws
async def _start(self, ws, data):
message = dict()
message['request'] = self.request
result = await run_aiograph(
data,
allow_subscriptions=True,
return_promise=True,
**{'context_value': message}
)
if isinstance(result, rx.Observable):
logger.debug('result is an Observable')
result.subscribe(lambda t: self.loop.create_task(self._send_result(ws, t)))
else:
response = format_response(result)
await ws.send_json(response)
async def _send_result(self, ws, result):
await ws.send_json(format_response(result))
Since this was a quick test this sufficed.
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Humble begginings</title>
<meta name="description" content="Test bed">
<meta name="author" content="me">
</head>
<body>
<script language="JavaScript">
socket = new WebSocket("ws://" + window.location.host + "/wsgql");
socket.onmessage = function(e) {
alert(JSON.parse(e.data));
}
socket.onopen = function(){
alert('Connection to server started')
}
socket.onclose = function(event){
if(event.wasClean){
alert('Clean connection end')
}else{
alert('Connection broken')
}
};
socket.onerror = function(error){
alert(error);
}
socket.onopen = function(){
alert('Connection to server started')
var msg = {
type: 'start',
query:
'subscription { testUpdated(channel:"test"){ msg }}',
}
socket.send(JSON.stringify(msg));
}
console.log('about to send message.')
</script>
</body>
</html>
@japrogramer: wondering if it makes sense ditching graphene subscriptions altogether for the sake of apollo client integration if there is no out-of-the box support for it.
Say, most minimal setup: Django users running on top of Postgres could simply run a standalone subscriptions-transport-ws with graphql-postgres-subscriptions, in graphene endpoint mutations we just directly use postgres's pubsub with NOTIFY pinAdded; '{"pinAdded":{"title":"wow",", "id":0}}'
Personally, I wouldn't consider graphql-postgres-subscriptions a suitable replacement for subscription functionality in Graphene. Obviously different people will have different needs, but if I'm building a GraphQL API for a Python project, divorcing my subscriptions backend from the main app introduces some undesirable complications. Unless you're just providing full, open access to your database, it would mean having to re-implement whatever authentication and permissions are involved in accessing your database. If you're using Django, for example, you no longer have access to the authentication backend and would need to re-implement it in Node, at which point why are you bothering with Python/Django/Graphene at all?
@joshourisman: this makes a good point, thanks. (I just had a thought of passing temporary subscription tokens to clients on login for things they can subscribe to, then verifying them upon subscription on subscriptions-transport-ws side)
Basically, I have a legacy (somehow large) django app. I'm investigating into graphene integration on top of that and hooking up apollo client with angular / native iOS / native Android clients with hopes of reducing amount of logic needed to be written to support the apis. And this seemingly build-in subscription thing into Apollo clients across each platform (haven't used any graphql before today pretty much) looks promising for the first glance.
@ambientlight you can implement subscriptions with django-channels and apollo, ive done it, its not trivial. Here is an example implementation that is similar to mine, in the way it delivers the message to the client. https://github.com/datadvance/DjangoChannelsGraphqlWs
Does anyone know when subscriptions will be implemented natively in graphene?
@dspacejs because graphene is very separated from the common python frameworks .. it is difficult to implement a delivery system for all of them in this package.
However i would like to see packages like graphene-django and graphene-flask to implement their own handler/view that works for their platform .. following the language API
closely enough so that it works with apollo. That way we don't have users trying to roll their own implementation every time someone wants subscriptions And all the eyes of the community are in one implementation for the best support.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
I didn't get, is there a native subscription support in Graphene or not. Shall I drop my https://github.com/datadvance/DjangoChannelsGraphqlWs or not yet?
@prokher please don't drop the only one I could get working 😄
Is there any updates on this subject?
Hi everyone! I just released a GraphQL subscriptions implementation for Graphene + Django based on an internal implementation we’ve been using in production for the last 6 months at Jetpack (https://tryjetpack.com/). It builds on @tricoder42's gist implementation and takes inspiration from the great work that @japrogramer, @eamigo86 and @prokher have done. Since this is the canonical Graphene subscriptions issue I thought I would post it here.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Any updates? (Trying to keep the discussion alive.) :)
Most helpful comment
@Eraldo @hballard Yes, I think I should post an update here as I'm working full on subscriptions now.
Some thoughts about my journey: the way Apollo-Subscriptions use to manage subscriptions was not very friendly for the developer, needing to hack around the resolution and a specific PubSub implementation that was "bypassing" the GraphQL engine for adapting it into subscriptions.
The reason for that is the GraphQL-js engine was not ready for subscriptions (meaning that was only able to return either a promise or a static value, but not a async iterator).
However GraphQL-js recently added a way to subscribing to a GraphQL query (that return an async iterator a.k.a.
Observable) that pushed towards simpler and cleaner implementations of subscriptions that decouple the subscription resolution from the "listener" on the subscription.That led to better implementations of the transport mechanisms in GraphQL subscriptions like subscriptions-transport-ws.
So, in summary, subscriptions is something that should be bundled fully into the GraphQL engine, in a way that is easy to plug any mechanisms, such as:
That don't require any specific pub/sub implementation and, eventually, let this decision to the developer (in case it want to use it).
For the next version of Graphene,
2.0I plan to have subscriptions bundled into the Engine :)There is already a branch in graphql-core where I'm doing the research process.
I will keep updating this thread with more information as I keep working on it.