Flask-socketio: Problem to run SocketIO with Celery

Created on 7 Sep 2016  路  21Comments  路  Source: miguelgrinberg/Flask-SocketIO

Hi, @miguelgrinberg first of all, congratulations for your job.
I'm trying to use Socke.IO to broadcast messages to connected clients. First, I did this synchronously and worked well, exactly how I hoped.
After that, I tried to do the same thing from a Celery task but it's not possible because "working outside of application context" (how some people already reported here in other issue).
So, I tried to follow the documentation (http://flask-socketio.readthedocs.io/en/latest/#using-multiple-workers).

Old code:

from flask_socketio import SocketIO, emit
from xproject.app import create_app

app = create_app()
socketio = SocketIO(app)
if __name__ == '__main__':
    socketio.run(app, debug=True, port=8081)
from flask_socketio import emit

def broadcast_crud(operation, entity_name):
    emit('entity_change', {'operation': operation, 'entity_name': entity_name}, 
        broadcast=True, namespace='/websocket')
Old console output:
* Restarting with stat
 * Debugger is active!
 * Debugger pin code: 270-121-125
(79052) wsgi starting up on http://127.0.0.1:8081
(79052) accepted ('127.0.0.1', 60479)
127.0.0.1 - - [07/Sep/2016 17:02:51] "GET /socket.io/?EIO=3&transport=polling&t=1473278571127-36 HTTP/1.1" 200 401 0.000890
127.0.0.1 - - [07/Sep/2016 17:02:51] "POST /socket.io/?EIO=3&transport=polling&t=1473278571135-37&sid=cda480af083e4a0c87ad9cc9b085bbe0 HTTP/1.1" 200 219 0.001070
(79052) accepted ('127.0.0.1', 60481)
(79052) accepted ('127.0.0.1', 60483)

New code:

from flask_socketio import SocketIO
from xproject.app import create_app

from xproject.settings import CeleryConfig

app = create_app()
socketio = socketio = SocketIO(app, message_queue=CeleryConfig.CELERY_BROKER_URL)
if __name__ == '__main__':
    socketio.run(app, debug=True, port=8081)
New console output:
 * Restarting with stat
 * Debugger is active!
 * Debugger pin code: 270-121-125
(79241) wsgi starting up on http://127.0.0.1:8081
(79241) accepted ('127.0.0.1', 60523)

When I run the new code the websocket connections stay always pending (according Chrome's Developer Tools shows), before that, Chrome's Developer Tools have showed status 200 on websocket connections (as you can see on server console too). The same thing happens when I try HTTP request to Rest services that also was working fine.

About Celery configuration

I configured Celery with Redis as "task's logger" and RabbitMQ as message queue manager.
See my Celery configuration:

from celery import Celery

from xproject.settings import PROJECT_APPS, CeleryConfig

celery_application = Celery('app', broker=CeleryConfig.CELERY_BROKER_URL)
celery_application.config_from_object(CeleryConfig)
celery_application.autodiscover_tasks(lambda: PROJECT_APPS)
class CeleryConfig(object):
    CELERY_BROKER_URL = 'amqp://xproject:password@localhost:5672/xprojecthost'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    CELERYD_CONCURRENCY = 5

About the configuration using this two services, my project is executing asynchronous tasks normally.
I always run celery before run the my server app.

Cellery execution log:
---- **** ----- 
--- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x10a46ad10
- ** ---------- .> transport:   amqp://xproject:**@localhost:5672/xprojecthost
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

So, I tried many changes withou any success. If you can help me, I'll be gratefull.

Thanks.

question

Most helpful comment

@andremeirelesa uwsgi is not compatible with eventlet, you have to use gevent, but even though gevent is compatible the set up to get WebSocket to work is a bit tricky. So basically, your options are:

  • use gunicorn instead of uwsgi. Then you can still use eventlet.
  • stay with uwsgi and switch to gevent, plus uwsgi's own websocket implementation. This should work but there is no development server option, you have to always run the application through uwsgi.

All 21 comments

I tried an equivalent solution using python-socketio but I had a similar result, the diference is that when try to stop the process (CTRL+C on terminal), and exception is shown and the websocket connections happens with success. If I try HTTP resquest to any Rest Service, the reponse is Internal Server Error (505).

Code
import eventlet
import eventlet.wsgi
import socketio

from xproject.app import create_app
from xproject.settings import CeleryConfig

mgr = socketio.KombuManager(CeleryConfig.CELERY_BROKER_URL)
sio = socketio.Server(client_manager=mgr)
app = create_app()

@sio.on('connect', namespace='/websocket')
def connect(sid, environ):
    print 'connected - sid: ' + sid
    print 'connected - sid: ' + str(environ)
    sio.emit('connect_ack', {'data': 'Server Response!'})

@sio.on('client_ack', namespace='/websocket')
def on_client_ack(sid, data):
    print 'client_ack - sid: ', sid
    print 'client_ack - data: ', str(data)
    sio.emit('server_ack', {'data': 'Server Response!'})

if __name__ == '__main__':
    app = socketio.Middleware(sio, app)
    eventlet.wsgi.server(eventlet.listen(('', 8081)), app)
Console output

(87539) wsgi starting up on http://0.0.0.0:8081
(87539) accepted ('127.0.0.1', 63448)
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/socketio/pubsub_manager.py", line 126, in _thread
    for message in self._listen():
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/socketio/kombu_manager.py", line 65, in _listen
    message = queue.get(block=True)
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/kombu/simple.py", line 53, in get
    timeout=timeout and remaining)
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/kombu/connection.py", line 275, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 95, in drain_events
    return connection.drain_events(**kwargs)
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/amqp/connection.py", line 303, in drain_events
    chanmap, None, timeout=timeout,
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/amqp/connection.py", line 366, in _wait_multiple
    channel, method_sig, args, content = read_timeout(timeout)
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/amqp/connection.py", line 330, in read_timeout
    return self.method_reader.read_method()
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
    self._next_method()
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/amqp/method_framing.py", line 107, in _next_method
    frame_type, channel, payload = read_frame()
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/amqp/transport.py", line 154, in read_frame
    frame_header = read(7, True)
  File "/Users/andremeireles/dev/pyenv-xproject/lib/python2.7/site-packages/amqp/transport.py", line 277, in _read
    s = recv(n - len(rbuf))
KeyboardInterrupt

(87539) accepted ('127.0.0.1', 63451)
127.0.0.1 - - [07/Sep/2016 19:15:15] "GET /socket.io/?EIO=3&transport=polling&t=1473286490796-591 HTTP/1.1" 200 401 24.406180
127.0.0.1 - - [07/Sep/2016 19:15:15] "GET /socket.io/?EIO=3&transport=polling&t=1473286493504-335 HTTP/1.1" 200 401 0.000612
127.0.0.1 - - [07/Sep/2016 19:15:20] "GET /socket.io/?EIO=3&transport=polling&t=1473286520288-592 HTTP/1.1" 200 401 0.000380
connected - sid: dcac751f45fa43ec8e067a49592403db
connected - sid: {'HTTP_REFERER': 'http://127.0.0.1:8080/', 'SCRIPT_NAME': '', 'REQUEST_METHOD': 'GET', 'PATH_INFO': '/socket.io/', 'HTTP_ORIGIN': 'http://127.0.0.1:8080', 'SERVER_PROTOCOL': 'HTTP/1.0', 'QUERY_STRING': 'EIO=3&transport=polling&t=1473286520288-592', 'HTTP_USER_AGENT': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36', 'HTTP_CONNECTION': 'keep-alive', 'REMOTE_PORT': '63451', 'SERVER_NAME': '127.0.0.1', 'REMOTE_ADDR': '127.0.0.1', 'eventlet.input': <eventlet.wsgi.Input object at 0x111a53dd0>, 'wsgi.url_scheme': 'http', 'SERVER_PORT': '8081', 'CONTENT_TYPE': 'text/plain', 'wsgi.input': <eventlet.wsgi.Input object at 0x111a53dd0>, 'HTTP_HOST': 'localhost:8081', 'wsgi.multithread': True, 'eventlet.posthooks': [], 'HTTP_ACCEPT': '*/*', 'wsgi.version': (1, 0), 'RAW_PATH_INFO': '/socket.io/', 'GATEWAY_INTERFACE': 'CGI/1.1', 'wsgi.run_once': False, 'wsgi.errors': <open file '<stderr>', mode 'w' at 0x10ea091e0>, 'wsgi.multiprocess': False, 'HTTP_ACCEPT_LANGUAGE': 'en-US,en;q=0.8,pt;q=0.6', 'headers_raw': (('Host', 'localhost:8081'), ('Connection', 'keep-alive'), ('Origin', 'http://127.0.0.1:8080'), ('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'), ('Accept', '*/*'), ('Referer', 'http://127.0.0.1:8080/'), ('Accept-Encoding', 'gzip, deflate, sdch'), ('Accept-Language', 'en-US,en;q=0.8,pt;q=0.6'), ('Cookie', 'io=fcf1cda089664d63aaacd2c050a0ff06; manage_view=thumb; advanced_toggle_checked=1; snatched_view=list; soon_view=thumb; suggest_view=thumb; late_view=list; blu_ray_com_new_releases_view=thumb; imdb_box_office_view=thumb; imdb_top_dvd_rentals_view=thumb; wanted_view=thumb')), 'HTTP_ACCEPT_ENCODING': 'gzip, deflate, sdch', 'HTTP_COOKIE': 'io=fcf1cda089664d63aaacd2c050a0ff06; manage_view=thumb; advanced_toggle_checked=1; snatched_view=list; soon_view=thumb; suggest_view=thumb; late_view=list; blu_ray_com_new_releases_view=thumb; imdb_box_office_view=thumb; imdb_top_dvd_rentals_view=thumb; wanted_view=thumb'}
127.0.0.1 - - [07/Sep/2016 19:15:20] "POST /socket.io/?EIO=3&transport=polling&t=1473286520296-593&sid=dcac751f45fa43ec8e067a49592403db HTTP/1.1" 200 219 0.165738
(87539) accepted ('127.0.0.1', 63456)
127.0.0.1 - - [07/Sep/2016 19:15:20] "GET /socket.io/?EIO=3&transport=polling&t=1473286520297-594&sid=dcac751f45fa43ec8e067a49592403db HTTP/1.1" 200 248 0.000231
client_ack - sid:  dcac751f45fa43ec8e067a49592403db
client_ack - data:  {u'data': u"I'm connected!"}

@andremeirelesa I think I know what's happening.

It seems you are following the Using Multiple Workers section of the documentation to configure your Celery workers. This is incorrect, the multiple workers in that context refers to running multiple Socket.IO servers. In your case, it seems you have a single Socket.IO server.

Emitting from Celery is a different use case, because the Celery workers do not run Socket.IO servers, they connect to an existing server through the message queue. The part of the documentation that covers this is Emitting from an External Process.

The difference is really minimal. For example, in the Socket.IO server case you initialize the SocketIO object as follows:

socketio = SocketIO(app, message_queue='redis://')

While in the case of an external process, such as a Celery worker, you do this:

socketio = SocketIO(message_queue='redis://')

The difference is that when you initialize SocketIO as a server, you provide the Flask application instance as a first argument. This tells Socket.IO to create a server and associate it with the given application. If you do not pass an application instance, then Socket.IO just connects to the message queue, but does not start a new server, which is what you want to happen in the Celery workers.

@miguelgrinberg I think I'm doing something else wrong, I had tested this way you sad with no success. I tried again, Celery console really shows:

[2016-09-08 09:11:17,452: INFO/Worker-4] Server initialized for eventlet.
[2016-09-08 09:11:17,452: INFO/Worker-4] emitting event "entity_change" to all [/websocket]

but socket clients don't receive the message.
Let me detail my current scenario. I have an application that provides websocket and Rest (blueprint) server, all together. I'm doing this so:

app = create_app()
socketio = SocketIO(app)
if __name__ == '__main__':
    socketio.run(app, debug=True, port=8081)

Some HTTP requests to Rest Services can call async tasks that are managed by Celery. After task finishes I want to notify clients by websocket. That's something like:

@blueprint.route('/store', methods=[HTTPMethod.POST])
def save_store(store):
    store_manager.save(store)
    return 'Store is been created'
@celery_application.task
def save(store):
    db.save('store', store)
    ws_manager.notify('created', 'store')
def notify(operation, entity_name):
    sio = SocketIO(message_queue=CeleryConfig.CELERY_BROKER_URL)
    sio.emit('entity_change', {'operation': operation, 'entityName': entity_name, 'async': async}, broadcast=True, namespace='/websocket')

After your response I began think that I'm doing something very wrong, because I can't see, considering that server application and Celery run in separated processes, where is the way back to Celery call the server app proccess (that's is 'who' is keeping the connections with clients).

Thanks

@andremeirelesa are you running Celery in eventlet mode or in its default mode?

What I recommend is that you use eventlet on the Flask-Socket.IO server, but there is really no good reason to also use eventlet with Celery. The problem is that if you use Celery in normal mode, you want the SocketIO instance that you create in that process to also run without eventlet, which you can do by passing an async_mode='threading'.

Take a look at how I initialize SocketIO in my flack example application: https://github.com/miguelgrinberg/flack/blob/master/flack/__init__.py#L38-L51

Hi, @miguelgrinberg , after your recomendations the broadcast to websocket clients from Celery tasks is working fine, but I think my solution is a little bit different from yours (flack). Look.

On server execution (run_server.py), I' m doing:

app = create_app()
socketio = SocketIO()
socketio.init_app(app,
                  message_queue=CeleryConfig.CELERY_BROKER_URL, async_mode='threading')

On Celery startup (manage.py run_celery) I don't instanciate SocketIO, I kept it default.

...
@manager.command
def run_celery():
    command = 'celery -A xproject worker --loglevel=INFO'
    os.system(command)
...
if __name__ == '__main__':
    manager.run()

celery_application = Celery('app', broker=CeleryConfig.CELERY_BROKER_URL)
celery_application.config_from_object(CeleryConfig)
celery_application.autodiscover_tasks(lambda: PROJECT_APPS)

I just changed broadcast service, I'm doing:

def broadcast_crud(operation, entity_name):
    sio = SocketIO(message_queue=CeleryConfig.CELERY_BROKER_URL)
    sio.emit('entity_change',  {'operation': operation, 'entityName': entity_name,}, 
        broadcast=True, namespace='/websocket')

First I run run_server.py, second I run manage.py run_celery (I preferred do not use Manager because I saw you had to override Server to make it works). Is my solution ok? I know It was enough to works! =)

However, I detected a strange behavior: when I try to stop run_server.py execution, a process keep listennig on the same app port, and I have to stop It using kill on terminal. Is there any solution for that or it's a normal behavior?

I've been gratefull for your help. Thanks

@andremeirelesa I think your solution is fine, but I have a couple of comments.

In run_server.py, I recommend that you use eventlet or gevent for async_mode, not threading. The threading mode is very inefficient and only intended for development use. Using threading in the Celery worker is totally fine, because you are not running a server in that process.

I also see a potential performance problem in constantly creating new SocketIO instances in the broadcast_crud function. If that function is called frequently, I recommend that you move the object creation outside of the function, like I do.

The problem with stopping a server with Ctrl-C is known. That is very tricky to get right, because most web servers have issues with stopping (see #199). You are now using threading mode, which means that your web server is Flask's development web server and this server has one of the worst behaviors. If you switch to eventlet or gevent as I recommended above it'll be better, but you may need to hit Ctrl-C twice in some cases to get the server to quit.

@miguelgrinberg I understood, but when I use async_mode='eventlet' on run_server, I go back to the first problem again, websocket connections never complete and the server stay waiting forever.
When I changed to threading, worked

If you get hangs while using eventlet or gevent, that's probably because you did not monkey patch the python standard library. This monkey patching makes the library compatible with eventlet's green threads.

@miguelgrinberg I've already 'monkey patched' for eventlet green world. Now, I think it's everything OK!
Thank you very much and congratulations for your job.

Hi @miguelgrinberg
Now I'm trying to use uwsgi to run the server but I'm having the same problem of pending connection like before monkey patched eventlet. I tried to setup uwsgi based on flack project.

wsgi.py

import eventlet
from flask_socketio import SocketIO, emit

eventlet.monkey_patch()

from xproject.settings import CeleryConfig
from xproject.app import create_app

application = app = create_app()

socketio = SocketIO()
socketio.init_app(application, message_queue=CeleryConfig.CELERY_BROKER_URL, async_mode='eventlet')


@socketio.on_error(namespace='/websocket')
def error_handler(e):
    print('An error has occurred: ' + str(e))


@socketio.on('connect', namespace='/websocket')
def connect():
    print 'connected!'


@socketio.on('client_ack', namespace='/websocket')
def on_client_ack(message):
    print 'client_ack - message: ', message
    emit('server_ack', {'data': 'Server Response!'})

I'm trying run:

uwsgi --gevent 100 --http 0.0.0.0:8081 --wsgi-file wsgi.py

All connections stay pending (to API or to WebSocket Server).

@andremeirelesa uwsgi is not compatible with eventlet, you have to use gevent, but even though gevent is compatible the set up to get WebSocket to work is a bit tricky. So basically, your options are:

  • use gunicorn instead of uwsgi. Then you can still use eventlet.
  • stay with uwsgi and switch to gevent, plus uwsgi's own websocket implementation. This should work but there is no development server option, you have to always run the application through uwsgi.

It's working fine using gunicorn! Thanks

I am use flask-sokcetio in git-webhook, everything is OK.

But it can not be killed by ctrl+c, and when I use supervisor to manage the start / stop, after run supervisor restart webhook, it may can not be restarted.

How to solve it ?

@hustcc Are you sure the problem is caused by Flask-Socket.IO? If you can provide me a small example that shows how Ctrl-C does not work that would be helpful, because I stop my Flask-SocketIO servers all the time, and never had a problem besides having to hit Ctrl-C twice for some web servers.

The code is here, a very simple Flask project with Cerlery, SocketIO.

app = Flask(__name__)

socketio = SocketIO(app, async_mode='threading',
                    message_queue=app.config['SOCKET_MESSAGE_QUEUE'])

It is developped on Windows, with python run_webhook.py, because the request is block by websocket, so I add async_mode='threading'.

Then after I enter Ctrl + C, nothing works, below:

image

What can work is to close the CMD window. This is a problem.


In online version, here, I ran it with gunicorn:

gunicorn --worker-class=eventlet -w 1 -t 30 -b 127.0.0.1:18340 run_webhook:app

and manage the process with supervisor, config is below:

[program:webhook]
directory=/home/xxx/webhook/
command=gunicorn --worker-class=eventlet -w 1 -t 30 -b 127.0.0.1:10178 run_webhook:app
user=xxx
autostart=false
autorestart=false

then supervisor restart webhook, always can stop the app, but can not start the app, util I run upervisor restart webhook again. This maybe a problem too.

Seems OK after I run it with:

gunicorn -k "geventwebsocket.gunicorn.workers.GeventWebSocketWorker" -w 1 -t 30 -b 127.0.0.1:18340 run_webhook:app

May be it is the problem of eventlet. But the problem on Windows CMD also exist.

Using async_mode='threading' is a bad idea. That is a mode that is only good for development, there is no support for websocket in this mode. And there is a known issue with the Flask development server and Ctrl-C that is probably what you are seeing. Please remove async_mode, so that the best async mode is selected based on installed packages, which in your case should be eventlet.

Supervisor has a few options to provide more control over applications that are tricky to kill. In particular, take a look at stopasgroup, which usually needs to be set to true for gunicorn.

OK, thank you for your answer. When I use the method below

gunicorn --worker-class=eventlet -w 1 -t 30 -b 127.0.0.1:18340 run_webhook:app

to start the Flask APP, will SocketIO override the async_mode ?

Because on windows, if I remove async_mode='threading', the http request will be block by the socketio (gevent, eventlet all are installed), How to solve it on windows ?

If I remove async_mode='threading', and run socketio.run(app, host='0.0.0.0', port=18340) on Linux with eventlet installed, the request is also blocked by socketio.

Only I run gunicorn --worker-class=eventlet -w 1 -t 30 -b 127.0.0.1:18340 run_webhook:app, it can run OK.

@hustcc not sure how eventlet and gevent work on windows, I don't think that is a very well tested platform.

Setting async_mode to threading effectively causes eventlet and gevent to be ignored, so you might as well uninstall them if they don't work well. The main difference is that in threading mode there is no websocket support.

My recommendation though, is that you install Cygwin, or the new Ubuntu support for Windows 10. I have used both and eventlet and gevent work well with them.

@hustcc oh, and also, if you get requests blocking while using gevent or eventlet, it could be that you are using code or libraries that need monkey patching to work with green threads. Try monkey patching and see if that helps.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

fbussv picture fbussv  路  4Comments

chaitanyavolkaji picture chaitanyavolkaji  路  3Comments

zuifengwuchou picture zuifengwuchou  路  5Comments

ypperlig picture ypperlig  路  5Comments

huangganggui picture huangganggui  路  3Comments