Flask-socketio: (Question) Combining Redis pub/sub with Sockets

Created on 10 Nov 2016  路  7Comments  路  Source: miguelgrinberg/Flask-SocketIO

Hi, @miguelgrinberg first, thanks for you great job.
I'm trying to push message from another thread or another server machine with redis pub/sub
and i've never did thing like this before, i want to know if in that way everything will be ok.

that code i posted here is working, but never did this before, i don't even trust my own code haha....
please give me so advice. thanks.

import ast

from app import socket_io
from app import user_data
from app.database import redis, Lobby, LobbyState

from flask import session
from flask_socketio import Namespace, join_room, leave_room, emit


class Socket(Namespace):
    def on_message(self, data):
        """ Send any data to lobby(room) """
        decode = ast.literal_eval(data['data'].decode('utf-8'))
        socket_io.emit('lobby', decode, namespace='/lobby', room=self.room)

    def sub(self, room=None):

        if not room:
            return

        self.room = room

        if hasattr(self, 'subscribed'):
            if self.subscribed:
                return

        self.pubsub = redis.pubsub()
        self.client = redis
        print("Subscribing to ", self.room)
        self.pubsub.subscribe(**{self.room: self.on_message})
        self.subscribed = True
        self.pubsub.run_in_thread(sleep_time=0.001)

    def un_sub(self):
        if hasattr(self, 'room'):
            self.pubsub.unsubscribe(self.room)
            self.subscribed = False
            self.room = None

    def pub(self, data):
        self.client.publish(self.room, data)

    def on_sv_lobby(self, data):
        if not data:
            return
        if data['type'] == 'join':
            try:
                # Get the lobby where data['lobby_id'} is equal.
                lob = Lobby.get(Lobby.owner == data['lobby_id'])
                # check the permissions
                if not lob.can_join():
                    return
                # Checking the lobby state
                if lob.state == LobbyState.LOBBY_STATE_UNUSED:
                    return
                # User can be in multiple rooms
                for lob in Lobby.select().where(
                                (Lobby.members.contains(session.get('steamid'))) & (Lobby.owner != data['lobby_id'])):
                    # Leave every other rooms
                    lob.leave_lobby()
                # join to room
                lob.join_lobby()
            except Lobby.DoesNotExist:
                print("lobby does not exist!")
            # Subscribe
            self.sub(room=data['lobby_id'])
            # Join sockets room
            join_room(data['lobby_id'])
            session['lobby_id'] = data['lobby_id']
        if data['type'] == 'chat':
            # publish data
            self.pub(data)

    def on_disconnect(self):
        # if user has not lobby_id it means that they aren't in any room
        if not session.get('lobby_id'):
            return
        try:
            # Get lobby
            lobby = Lobby.get(Lobby.owner == session.get('lobby_id'))
            # Then leave.
            lobby.leave_lobby()
            # unsubscribe last room
            self.un_sub()
        except Lobby.DoesNotExist:
            print("Lobby does not exist")
        finally:
            # tell to users who in that lobby who disconnected
            emit('lobby', {
                'type': 'leave',
                'user': user_data()['steamid'],
            }, room=session.get('lobby_id'))
            leave_room(session.get('lobby_id'))
            session['lobby_id'] = None

socket_io.on_namespace(Socket('/lobby'))
question

Most helpful comment

Yes :)

This is what you can use instead of rolling your own: https://flask-socketio.readthedocs.io/en/latest/#emitting-from-an-external-process.

All 7 comments

Seems you have manually implemented pub/sub. Any reason you did not use Flask-SocketIO's own support for emitting from other processes?

@miguelgrinberg of course i could use flask-socketio's emitting but if im not wrong i have 2 separated server so i had to implement redis pubsub. do you have any suggestions ?

(edit) sounds like i didn't read the document there was a support for redis already.
is this the same thing of what i have done ?

Yes :)

This is what you can use instead of rolling your own: https://flask-socketio.readthedocs.io/en/latest/#emitting-from-an-external-process.

thank you! @miguelgrinberg i was bad, should have a look on the document first.
i'm gonna recode the part i've done.

Finally, if i want to pubsub from another server or a game plugin that have pubsub, What should I do?

as this code is listening to socketio should i publish a message to socketio ? https://github.com/miguelgrinberg/python-socketio/blob/cc9027586f045b6aa96e832bb287971762fb339d/socketio/redis_manager.py#L49-L56

This is explained in the link I gave you. What you do is this:

socketio = SocketIO(message_queue='redis://')
socketio.emit('my event', {'data': 'foo'}, namespace='/test')

@miguelgrinberg thanks for posting that, it is exactly what I've been looking for. Is there an example of using that code in an application? Would that code be called from a celery background task?

@griffin2000 my Flack application is probably the best example of using this technique. The emit() line works either on the main server, or in an auxiliary process such as a Celery worker. The difference between the two is in how the socketio object is initialized. The syntax shown above, where just the message_queue argument is passed, is appropriate for an external process or Celery worker. The same object is initialized as follows for the main server(s):

socketio = SocketIO(app, message_queue='redis://')
Was this page helpful?
0 / 5 - 0 ratings