Flask-socketio: Emits all at once, at the end, despite sleep?

Created on 19 Apr 2018  路  9Comments  路  Source: miguelgrinberg/Flask-SocketIO

Very grateful for any help! - I might be missing a trick but I am trying two different things, both with no success because the messages are all being sent at once, when the function exits, whereas I want them to go one by one.

My first attempt is using sleeps to block the thread. I have seen this approach used by other people successfully and from what I have read can't understand why it doesn't work, but the emit is called 100 time before the browser gets anything)

@socketio.on('my event2')
def threadtest(message):
    print('got the message')
    for i in range(100):
        print('sending call'+str(i))
        socketio.emit('update','call'+str(i))
        gevent.sleep(0) # I expect one of these to block the thread, allowing the emit to happen
        eventlet.sleep(0)
        socketio.sleep(0)
        if (i%10==0):
            time.sleep(1)

The messages are only sent at the end, all in one go. I couldn't understand why that shouldn't work, so here is another attempt, using a queue

    q = eventlet.queue.Queue()

    def worker():
        while (True):
            item=q.get()

            if item is None:
                print('worker: item is none')
                break
            print('worker ' + item)
            socketio.emit('update',item)
            socketio.send(item)
            gevent.sleep(0) # I expect one of these to block the thread, allowing the emit to happen
            eventlet.sleep(0)
            socketio.sleep(0)
            q.task_done()

    @socketio.on('my event2')
    def threadtest(message):
        print('got the message')
        t = threading.Thread(target=worker)
        t.start()
        for i in range(100):
            print('queuing:call'+str(i))
            q.put('call'+str(i))
            if (i%10==0):
                time.sleep(1)

The result of this is very similar: 100 x emit(), only after the 100th emit all data sent to server

I am monkey-patching in init.py like this

from eventlet import monkey_patch
monkey_patch()
print("patched eventlet")

(Javascript in brower is like this )

$( document ).ready(function() {
    console.log("up");
    var socket = io.connect('http://' + document.domain + ':' + location.port);
    socket.on('connect', function() {
        console.log("connected");
        socket.emit('my event2', {data: 'details bla|bla'});
    });
    socket.on('message', function(msg){
        $('#test').append(msg+'<br />');
        console.log(msg);
    });
    socket.on('update', function(msg){
        $('#test').append(msg+'<br />');
        console.log(msg);
    });
});

In all cases, the emit is called 100 times before the browser gets any output, whereas I expect the browser to get messages gradually because any one of the sleep() s should block the threads

Grateful for any help!

question

Most helpful comment

@cglacet it's not a hack. An asynchronous application relies on all tasks releasing the CPU voluntarily. A CPU release occurs when a task performs any I/O operation, a threading operation, or when it calls the sleep function. If your task calls emit() and then does not do any I/O or threading operations in a while, the only way you have to give a chance to the few background tasks that SocketIO uses to run is to sleep.

All 9 comments

There is something I don't understand. You have a loop that runs 100 times, with a sleep(0) in it (by the way, you should just use socketio.sleep(0), that's the only sleep call that you need).

This 100 iteration loop is not going to take a long time to run, my assumption is that it'll run to completion in less than second. So how are you able to tell that the browser does not receive messages progressively with such a short loop? Could you change the sleep to socketio.sleep(1), for example, so that there is a one second delay between each emit?

Actually it pauses for a second every ten loops, so it takes a few seconds to run (and I did also make it x 1000 loops late last night when I really wanted to be sure.)

You can see from the various sleeps that I was short of sleep! Thanks, socketio.sleep it is then - and with socketio.sleep(1) it works, but I have seen a lot of reference to sleep(0) in other threads but sleep(0) doesn't work for me. For my purposes, sleep(1) is actually fine, but just so I learn and looking at the code, shouldn't sleep(0) also work?

Again thanks, and thanks for writing this!

Just to add, and in case it helps, I have been doing more experiments -

  • the first way above (no separate queue) doesn't work. I have tried with zero and different numbers of seconds passed to socketio.sleep().
  • The second way (with queue and worker thread doing the socketio.emit()s) does work if the argument number is > 0. (eg 0.1) but does not work if it is 0.
    (That is all using the eventlet model)

Where do you have the sleep(0) that does not work, in the threadtest function or in the worker() function? You really don't need to sleep in worker() because you are waiting on the queue, that also allows other tasks to run.

No I know it should be with the sleep(0) in the function, and when you point it out it is more suprising that it should work the way I have it

This works:

def worker():
    while (True):
        item = q.get()
        if item is None:
            print('worker: item is none')
            break
        print('worker: ' + item)
        socketio.emit('update', item)
        socketio.sleep(0.1) ###socketio.sleep in worker!!!######
        q.task_done()


@socketio.on('my event2')
def threadtest(message):
    print('got the message')
    t = threading.Thread(target=worker)
    t.start()
    for i in range(100):
        print('queuing: call' + str(i))
        q.put('call' + str(i))

That gives me the numbers coming out on browser at the same time as the debug appears in the python

but this seems to block

def worker():
    while (True):
        item = q.get()

        if item is None:
            print('worker: item is none')
            break
        print('worker: ' + item)
        socketio.emit('update', item)
        q.task_done()


@socketio.on('my event2')
def threadtest(message):
    print('got the message')
    t = threading.Thread(target=worker)
    t.start()
    for i in range(100):
        print('queuing: call' + str(i))
        q.put('call' + str(i))
        socketio.sleep(0.1)  ###socketio.sleep in function!!!###

That gives me one emit or zero emits and the rest of the emits when threadtest finishes, which surprises me!

I've got it working now in any case, thanks, and will keep exploring

Isn't there a way to force emit to happen as soon as it is invoked? Adding sleep timers looks like a hack, not a real solution.

@cglacet it's not a hack. An asynchronous application relies on all tasks releasing the CPU voluntarily. A CPU release occurs when a task performs any I/O operation, a threading operation, or when it calls the sleep function. If your task calls emit() and then does not do any I/O or threading operations in a while, the only way you have to give a chance to the few background tasks that SocketIO uses to run is to sleep.

I am facing a similar issue. The first emit happens, but it that other emits are ignored by the time sio.sleep finishes. And the execution waits at trainer.fit_model() . Any pointers to go about it.

@sio.on('runModel') def timer(sid, data): data = get_form_data() config = map_request_data(data) trainer = DnnTrainer(config) sio.emit("runModel", "Starter Reading Data") sio.sleep(0.1) # this doesn't take time trainer.read_data() sio.emit("runModel", "Validating Config file") sio.sleep(0.1) # this doesn't take time trainer.validate_config() sio.emit("runModel", "Processing data") sio.sleep(0.1) # this doesn't take time trainer.process_data() sio.emit("runModel", "Building Model") sio.sleep(0.1) # this doesn't take time trainer.build_model() sio.emit("runModel", "Compiling Model") sio.sleep(0.1) # this doesn't take time trainer.compile_model() sio.emit("runModel", "Fitting the model") sio.sleep(0.1) # this is a long process trainer.fit_model() sio.emit("runModel", "Done")

You can't have a function that takes a long time. If that function has some lengthy process that has a loop, then you need to add sleep calls inside the loop, so that the other tasks always are given a bit of CPU time.

Was this page helpful?
0 / 5 - 0 ratings