Hi, @miguelgrinberg first, thanks for you great job.
I'm trying to push message from another thread or another server machine with redis pub/sub
and i've never did thing like this before, i want to know if in that way everything will be ok.
that code i posted here is working, but never did this before, i don't even trust my own code haha....
please give me so advice. thanks.
import ast
from app import socket_io
from app import user_data
from app.database import redis, Lobby, LobbyState
from flask import session
from flask_socketio import Namespace, join_room, leave_room, emit
class Socket(Namespace):
def on_message(self, data):
""" Send any data to lobby(room) """
decode = ast.literal_eval(data['data'].decode('utf-8'))
socket_io.emit('lobby', decode, namespace='/lobby', room=self.room)
def sub(self, room=None):
if not room:
return
self.room = room
if hasattr(self, 'subscribed'):
if self.subscribed:
return
self.pubsub = redis.pubsub()
self.client = redis
print("Subscribing to ", self.room)
self.pubsub.subscribe(**{self.room: self.on_message})
self.subscribed = True
self.pubsub.run_in_thread(sleep_time=0.001)
def un_sub(self):
if hasattr(self, 'room'):
self.pubsub.unsubscribe(self.room)
self.subscribed = False
self.room = None
def pub(self, data):
self.client.publish(self.room, data)
def on_sv_lobby(self, data):
if not data:
return
if data['type'] == 'join':
try:
# Get the lobby where data['lobby_id'} is equal.
lob = Lobby.get(Lobby.owner == data['lobby_id'])
# check the permissions
if not lob.can_join():
return
# Checking the lobby state
if lob.state == LobbyState.LOBBY_STATE_UNUSED:
return
# User can be in multiple rooms
for lob in Lobby.select().where(
(Lobby.members.contains(session.get('steamid'))) & (Lobby.owner != data['lobby_id'])):
# Leave every other rooms
lob.leave_lobby()
# join to room
lob.join_lobby()
except Lobby.DoesNotExist:
print("lobby does not exist!")
# Subscribe
self.sub(room=data['lobby_id'])
# Join sockets room
join_room(data['lobby_id'])
session['lobby_id'] = data['lobby_id']
if data['type'] == 'chat':
# publish data
self.pub(data)
def on_disconnect(self):
# if user has not lobby_id it means that they aren't in any room
if not session.get('lobby_id'):
return
try:
# Get lobby
lobby = Lobby.get(Lobby.owner == session.get('lobby_id'))
# Then leave.
lobby.leave_lobby()
# unsubscribe last room
self.un_sub()
except Lobby.DoesNotExist:
print("Lobby does not exist")
finally:
# tell to users who in that lobby who disconnected
emit('lobby', {
'type': 'leave',
'user': user_data()['steamid'],
}, room=session.get('lobby_id'))
leave_room(session.get('lobby_id'))
session['lobby_id'] = None
socket_io.on_namespace(Socket('/lobby'))
Seems you have manually implemented pub/sub. Any reason you did not use Flask-SocketIO's own support for emitting from other processes?
@miguelgrinberg of course i could use flask-socketio's emitting but if im not wrong i have 2 separated server so i had to implement redis pubsub. do you have any suggestions ?
(edit) sounds like i didn't read the document there was a support for redis already.
is this the same thing of what i have done ?
Yes :)
This is what you can use instead of rolling your own: https://flask-socketio.readthedocs.io/en/latest/#emitting-from-an-external-process.
thank you! @miguelgrinberg i was bad, should have a look on the document first.
i'm gonna recode the part i've done.
Finally, if i want to pubsub from another server or a game plugin that have pubsub, What should I do?
as this code is listening to socketio should i publish a message to socketio ? https://github.com/miguelgrinberg/python-socketio/blob/cc9027586f045b6aa96e832bb287971762fb339d/socketio/redis_manager.py#L49-L56
This is explained in the link I gave you. What you do is this:
socketio = SocketIO(message_queue='redis://')
socketio.emit('my event', {'data': 'foo'}, namespace='/test')
@miguelgrinberg thanks for posting that, it is exactly what I've been looking for. Is there an example of using that code in an application? Would that code be called from a celery background task?
@griffin2000 my Flack application is probably the best example of using this technique. The emit() line works either on the main server, or in an auxiliary process such as a Celery worker. The difference between the two is in how the socketio object is initialized. The syntax shown above, where just the message_queue argument is passed, is appropriate for an external process or Celery worker. The same object is initialized as follows for the main server(s):
socketio = SocketIO(app, message_queue='redis://')
Most helpful comment
Yes :)
This is what you can use instead of rolling your own: https://flask-socketio.readthedocs.io/en/latest/#emitting-from-an-external-process.