Flask-socketio: When two Socket.IO(JS) client connected. All of the clients become choppy!

Created on 22 Apr 2019  Â·  21Comments  Â·  Source: miguelgrinberg/Flask-SocketIO

I'm trying to broadcast images(.png) to web using socketIO.
When only one client connected to the server, everything is ok. But when another client connected, all clients become choppy.
It seems that the two client take turns to receive message from server. Not at the same time.
Did i miss any settings?
Here is my code of server.

app = Flask(__name__)
socketio = SocketIO()

# receive frame 
def recv_task(protobuf_que):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.DEALER)
    sock.bind("tcp://*:9988")
    while True:
        msg = sock.recv()
        m = msg_pb2.OcvMat()
        m.ParseFromString(msg)
        if m.mat_data != b'':           
            protobuf_que.put(m)


def encode_task(protobuf_que):
    temp_io = SocketIO(message_queue='redis://')
    v_writer = None
    while True:
        if not protobuf_que.empty():
            m = protobuf_que.get()
            # m is raw data of bgr image   
                        # use opencv encode m to png then emit png(base64encode).
                        temp_io.emit("frame",  png , braodcast=True)


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    protobuf_que = manager.Queue()

    encodeProcess = Process(target = encode_task, args=(protobuf_que,))
    encodeProcess.start()

    recvProcess = Process(target = recv_task, args=(protobuf_que,))
    recvProcess.start()

    socketio.init_app(app,  message_queue = 'redis://')
    socketio.run(app, host='0.0.0.0', port=5000)
    recvProcess.join()
    encodeProcess.join()

And here is the code in js

 socket.on("frame", function(data){
                var msg = JSON.parse(data);
                gList = msg.points;
                var bkImg = new Image();
                var srcStr = "data:image/png;base64," + msg.b64;
                bkImg.src = srcStr;
                bkImg.setAttribute('crossOrigin', 'anonymous');
                bkImg.onload = function()
                {
                    $('#can').drawImage({
                        source: bkImg,
                        x:0, y:0,
                        fromCenter: false,
                        load: drawPts
                    });

                }

Did i miss something?
I'll appreciate it very much if you could help me

question

All 21 comments

It seems that when more clients connected, the emit function get slow.
When i close some clients, it get faster then.
What did i miss?

Are you using an async framework? You are using libraries that are not async friendly, specifically multiprocessing, the Process class and zmq. What I think is happening is that the blocking functions in these libraries are slowing everything down, and the more clients you have the more you rely on them so it gets worse.

Are you using an async framework? You are using libraries that are not async friendly, specifically multiprocessing, the Process class and zmq. What I think is happening is that the blocking functions in these libraries are slowing everything down, and the more clients you have the more you rely on them so it gets worse.

Thank you for your reply.
I'm trying to move emit to the main process where hold the flask_app.
Here is my code.

import eventlet
eventlet.monkey_patch()
from eventlet.green import zmq
from multiprocessing import Process
import multiprocessing
from multiprocessing import Queue

app = Flask(__name__)

app.config.update(
    SECRET_KEY = 'secret',
    ALLOWED_EXTENSIONS = set('txt pdf png jpg jpeg pkg'.split()),
    MAX_CONTENT_LENGTH = 50*1024*1024,
    SOCKETIO_MESSAGE_QUEUE = "redis://"
)
app.debug = False
app.threaded = True

def recv_task(protobuf_que):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.DEALER)
    sock.bind("tcp://*:9988")
    v_writer = None
    while True:
        msg = sock.recv()
        m = msg_pb2.OcvMat()
        m.ParseFromString(msg)
        if m.mat_data != b'':
            # encode
            pt_list = list()
            for i in  m.points:
                pt_list.append({"x":i.x, "y":i.y})          
            nparr = np.fromstring(m.mat_data, np.uint8).reshape(m.rows, m.cols, m.channel)              
            # record video 
            t1 = time.time()
            if v_writer is None:
                v_writer = VideoWriter(m.cols, m.rows, False)
            t_ms = time.time()
            v_writer.write(nparr, t_ms)
            t2 = time.time()
            print('+++ Video cost:%f +++\n' % (t2-t1,))
            protobuf_que.put({"mat":nparr, "points":pt_list})

def encode_task(protobuf_que, encode_que):
    while True:
        if not protobuf_que.empty():
            info = protobuf_que.get()
            t1 = time.time()
            Img = cv2.imencode('.png', info['mat'])[1].tobytes()
            t2 = time.time()
            print("+++pid:%d ,Encode cost:%f +++\n" % (os.getpid(), t2-t1))
            encode_que.put(ujson.dumps({'b64': base64.b64encode(Img).decode(), "points":info['points']}))

def recv_thread(args):
    while True:
        if not args.empty():
            content = args.get()
            socketio.emit("frame", content, broadcast = True)
        else:
            socketio.sleep(0)

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    protobuf_que = manager.Queue()
    encode_que = manager.Queue()

    sub_process =  Process(target = recv_task, args=(protobuf_que,))
    sub_process.start()

    for i in range(4):
        pros = Process(target=encode_task, args=(protobuf_que, encode_que))
        pros.start()

    with thread_lock: 
        if thread is None:
            thread = socketio.start_background_task(target=recv_thread, args=encode_que)

    socketio.run(app, host='0.0.0.0', port=5000)
    sub_process.join()

It improve a lot. It won't be choppy. However, when more clients connected, it get slow too.
When only one client left, i can see receive message faster on the web. Then back to normal.

I don't emit on the child process. The child process only do some cpu task. Is this the same problem you mentioned above.
How can i fixed it?
By the way, i have used
import eventlet eventlet.monkey_patch() and 'from eventlet.green import zmq'

As I mentioned above, multiprocessing is not compatible with eventlet.

As I mentioned above, multiprocessing is not compatible with eventlet.

First of all, i would like to express my gratitude for your guidance and patience.
But in my program. There are some CPU-bound task. So I have to use multiprocess.
Is there any alternatives of multiprocessing? Or it there other way to deal with CPU-bound task?

There are many different ways to do this, but I don't know any that uses the multiprocessing interface. If you run the CPU heavy tasks in the same process you can call sleep(0) often to allow other tasks to run in parallel to the CPU consuming task. If you want to leave the CPU heavy tasks in other processes, then start those other processes separately and use a supported method of communication such as sockets, queues, etc.

There are many different ways to do this, but I don't know any that uses the multiprocessing interface. If you run the CPU heavy tasks in the same process you can call sleep(0) often to allow other tasks to run in parallel to the CPU consuming task. If you want to leave the CPU heavy tasks in other processes, then start those other processes separately and use a supported method of communication such as sockets, queues, etc.

Thank you for your suggestion.
I'm trying to use celery to replace multiprocessing.
I'll continue to report my results.
:)

I'm very disappointed that celery doesn't meet my needs.
The serialize cost too much time,
I'm serialize a string, which might be 2MB.
Each string cost about 20 ms.
Did i do something wrong?
I'm trying to do CPU heavy task in other process, before put into flask.

Celery is not a good idea for high frequency tasks. But in general, using a
separate process you will always have to send your data between processes.
You can do it more efficiently if you implement this without a message
queue, but not sure if it will be fast enough for your needs.

On Tue, Apr 23, 2019, 2:53 PM StartAt24 notifications@github.com wrote:

I'm very disappointed that celery doesn't meet my needs.
The serialize cost too much time,
I'm serialize a string, which might be 2MB.
Each string cost about 20 ms.
Did i do something wrong?
I'm trying to do CPU heavy task in other process, before put into flask.

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/miguelgrinberg/Flask-SocketIO/issues/955#issuecomment-485812550,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAUXBTVGJOVDKZ2Y3G5ZNCDPR4IG3ANCNFSM4HHOH4GA
.

Celery is not a good idea for high frequency tasks. But in general, using a separate process you will always have to send your data between processes. You can do it more efficiently if you implement this without a message queue, but not sure if it will be fast enough for your needs.
…
On Tue, Apr 23, 2019, 2:53 PM StartAt24 @.*> wrote: I'm very disappointed that celery doesn't meet my needs. The serialize cost too much time, I'm serialize a string, which might be 2MB. Each string cost about 20 ms. Did i do something wrong? I'm trying to do CPU heavy task in other process, before put into flask. — You are receiving this because you commented. Reply to this email directly, view it on GitHub <#955 (comment)>, or mute the thread https://github.com/notifications/unsubscribe-auth/AAUXBTVGJOVDKZ2Y3G5ZNCDPR4IG3ANCNFSM4HHOH4GA .

Hi again.
I have found something, that might be helpful. I have remove multiprocessing from my project.
But when more clients connected,it get slow too.
It seems that the emit did not broadcast to every client at the same time.
It take turns to send message to each client.
So when i close other clients. The only client left back to normal.
Did i miss any param for emit?

I have read your another article https://blog.miguelgrinberg.com/post/video-streaming-with-flask
It works great for me! When more clients connected, it won't slow down.
It confused me that what are the differences between emit on flask-socketio with this video-streaming-with-flask.
I have tried use from eventlet import greenpool to emit message at the same time. But it still slow down,when more clients connected.
So is there anything that i can do to make emit to many clients at the same time.
If i can solve this problem, i think my program will work fine.

I'm not sure I understand. Each client has a network socket open. You need to write your data on all these sockets, from a process that runs on a single CPU. How can you issue these writes at the exact same time? The only way to write is to issue a loop and write to one client at a time. The amount of data that you are writing and/or the processing that you do on this data might be adding delays, so you should concentrate on the performance of your application because there is nothing you can do on the networking side.

I have read your another article https://blog.miguelgrinberg.com/post/video-streaming-with-flask
It works great for me! When more clients connected, it won't slow down.
It confused me that what are the differences between emit on flask-socketio with this video-streaming-with-flask.

That application uses multithreading instead of eventlet. It cannot scale to large number of clients like eventlet can because of that. It also does not do any processing on the video, it just sends the video frames as they arrive from the camera, and these frames are heavily compressed so they are small.

You can use Socket.IO in multithreading mode, but in this mode there is no support for WebSocket, so you will lose performance on that side.

Thanks for your help and patience.
My program runs on a multi-core CPU. What i understand is that, the flask-socketio runs only in one core.
So that emit sends message only in one core.
What i want to do is:

(same time)
emit('frame', frame)  --> clientA
emit('frame', frame)  --> clientB
emit('frame', frame)  --> clientC
...

For now, because the flask-socketio runs on only one core. So it takes turns to send to clientA, clientB,clientC.
I don't know how to use more cores.
I have tried this: (I hacked sid to the target to emit not to skip)

def _send_message(sid, content):
    socketio.emit("frame", content, skip_sid=sid)

from eventlet import greenpool
pool.spawn_n(_send_message, sid, content)

But it does't work.
Can you give some suggest, what i can do to emit at the same time.

Here is my code

import eventlet
eventlet.monkey_patch()

import os
from flask import Flask, request, session, g, redirect, url_for, abort, \
    render_template, flash, jsonify, send_from_directory, make_response
import ujson

from eventlet.green import threading
from flask_socketio import SocketIO, emit
import time ,cv2, msg_pb2, base64
from eventlet.green import zmq

app = Flask(__name__)

app.config.update(
    SECRET_KEY = 'secret',
)


app.debug = False
app.threaded = True
clients = set()
pool = greenpool.GreenPool(20)
ctx = None
sock = None

def init_zmq():
    global ctx 
    ctx = zmq.Context()
    global sock
    sock = ctx.socket(zmq.ROUTER)
    sock.bind("tcp://*:9988")

def recv():
    msg = sock.recv()
    m = msg_pb2.OcvMat()        
    m.ParseFromString(msg)
    pt_list = list()
    for i in m.points:
        pt_list.append({"x":i.x, "y":i.y})
    if m.rows == 0:
        return 0,0
    nparr = np.fromstring(m.mat_data, np.uint8).reshape(m.rows, m.cols, 1)
    pngImg = cv2.imencode('.png', nparr)[1].tobytes()
    return base64.b64encode(pngImg).decode(), pt_list

def recv_thread():
    while True:
        img, pt_list = recv()
        if img != 0:            
            content = ujson.dumps({"b64" : img, "points" : pt_list})
            tmp_clients = None
            for sid in clients.copy():
                pool.spawn_n(_send_message, sid, content)
        else:
            socketio.sleep(0)


if __name__ == '__main__':
    init_zmq()
    socketio.start_background_task(target=recv_thread)
    socketio.run(app, host='0.0.0.0', port=5000)

There is no way to do what you want in Python. Python programs always run one thread at a time. I'm really not sure what else to suggest, if you are sending too much data it is expected that things are going to slow down as the number of clients increase because your network has a limit.

There is no way to do what you want in Python. Python programs always run one thread at a time. I'm really not sure what else to suggest, if you are sending too much data it is expected that things are going to slow down as the number of clients increase because your network has a limit.

But,why video-streaming-with-flask works fine?
It sends image in one thread too.

I don't think you understand how the streaming example works. Each client uses a separate thread. The frames aren't synchronized either, each client may receive frames at a different time.

I don't think you understand how the streaming example works. Each client uses a separate thread. The frames aren't synchronized either, each client may receive frames at a different time.

What i mean is that, how can i get the same result in flask-socketio as video-streaming-with-flask?
If you rewrite video-streaming-with-flask with flask-socketio, what will you do?
Can you show me some example? I really don't know what to do.

I don't have any examples to show you, sorry. Not even sure it is a good idea to use Socket.IO since eventlet would be likely incompatible with reading frames from a camera.

If you like the streaming solution, then I suggest you implement your application with streaming, I don't understand why you want to use Socket.IO in spite of all the difficulties you are finding.

Because i thank Socket.IO reach my needs.
I'm receiving a frame(a BGR frame) from other machine through ZMQ. Then i do some process with the frame, and encode it to png.
I just want Socket.IO help me sending png to web client. Don't need to ensure all the client to have the same png at the same time. But need to ensure that each clients can get the png smoothly.
I thank that Socket.IO can handle it. Am i wrong?
Another reason is that working with flask-socketio and socket.io.js is very convenient.
:)

@StartAt24 you are asking me if your problem can be solved with Python and I can't answer that, because I don't have the information. As I said above, async applications are not good with heavy CPU processing, so I'm not surprised you have problems, but I can't really comment on the specifics of your case.

@miguelgrinberg sorry for take up too much of your time.
I thank the the difficulties I have encountered is due to my lack of asio , eventlet and 'socketio' 's knowledge.
I found a way to hack my problem. I started a new websocket server with python-websocket-server to push image to web.And use flask-socketio to deal with other message.

I learned a lot from communicating with you. Thank you for your help sincerely

Was this page helpful?
0 / 5 - 0 ratings

Related issues

huangganggui picture huangganggui  Â·  3Comments

lnunno picture lnunno  Â·  4Comments

dlernz picture dlernz  Â·  4Comments

ypperlig picture ypperlig  Â·  5Comments

nh916 picture nh916  Â·  3Comments