I am trying to implement Server-Sent Events in my Flask application by following this simple recipe: http://flask.pocoo.org/snippets/116/
For serving the app, I use gunicorn with a custom BaseApplication
subclass and gevent workers.
A minimal version of my code looks like this:
import multiprocessing
from gevent.queue import Queue
from gunicorn.app.base import BaseApplication
from flask import Flask, Response
app = Flask('minimal')
# NOTE: This is the global list of subscribers
subscriptions = []
class ServerSentEvent(object):
def __init__(self, data):
self.data = data
self.event = None
self.id = None
self.desc_map = {
self.data: "data",
self.event: "event",
self.id: "id"
}
def encode(self):
if not self.data:
return ""
lines = ["%s: %s" % (v, k)
for k, v in self.desc_map.iteritems() if k]
return "%s\n\n" % "\n".join(lines)
@app.route('/api/events')
def subscribe_events():
def gen():
q = Queue()
print "New subscription!"
subscriptions.append(q)
print len(subscriptions)
print id(subscriptions)
try:
while True:
print "Waiting for data"
result = q.get()
print "Got data: " + result
ev = ServerSentEvent(unicode(result))
yield ev.encode()
except GeneratorExit:
print "Removing subscription"
subscriptions.remove(q)
return Response(gen(), mimetype="text/event-stream")
@app.route('/api/test')
def push_event():
print len(subscriptions)
print id(subscriptions)
for sub in subscriptions:
sub.put("test")
return "OK"
class GunicornApplication(BaseApplication):
def __init__(self, wsgi_app, port=5000):
self.options = {
'bind': "0.0.0.0:{port}".format(port=port),
'workers': multiprocessing.cpu_count() + 1,
'worker_class': 'gevent',
'preload_app': True,
}
self.application = wsgi_app
super(GunicornApplication, self).__init__()
def load_config(self):
config = dict([(key, value) for key, value in self.options.iteritems()
if key in self.cfg.settings and value is not None])
for key, value in config.iteritems():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
if __name__ == '__main__':
gapp = GunicornApplication(app)
gapp.run()
The problem is that the subscriber's list seems to be different for every worker. This means that if worker #1 handles the /api/events
endpoint and adds a new subscriber to the list, the client will only receive events that are added when worker #1 also handles the /api/test
endpoint.
Curiously enough, the actual list object seems to be the same for each worker, since id(subscriptions)
returns the same value in every worker.
Yes, it's normal, because Gunicorn forks itself to generate workers.
If you want to do this use case, I recommend you to use Redis or something like that between workers.
I use Redis as a bus with pub/sub between my workers to share data between my WebSockets clients, with API-Hour (based on Gunicorn): http://www.api-hour.io/, aiohttp.web: http://aiohttp.readthedocs.org/en/latest/web.html#websockets and asyncio_redis: http://asyncio-redis.readthedocs.org/en/latest/pages/examples.html#pubsub
Great, thank you! Sorry for spamming the issue tracker here :/
Most helpful comment
Yes, it's normal, because Gunicorn forks itself to generate workers.
If you want to do this use case, I recommend you to use Redis or something like that between workers.
I use Redis as a bus with pub/sub between my workers to share data between my WebSockets clients, with API-Hour (based on Gunicorn): http://www.api-hour.io/, aiohttp.web: http://aiohttp.readthedocs.org/en/latest/web.html#websockets and asyncio_redis: http://asyncio-redis.readthedocs.org/en/latest/pages/examples.html#pubsub