Hi Miguel,
Just quick question for you. Sorry for the potentially silly question below. Is it possible to emit socketio events from an external process using Kafka? More precisely, in some external process, have the below (as in the documentation)
socketio = SocketIO(message_queue='redis://')
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
socketio.emit('my event, {'data': 'foo'}, namespace='/test')
where message_queue I guess would point to my own redis message queue that flask-socketio would use to communicate with my external process above. Would there be any threading issues I would have to consider here like monkey patching?
Thanks for your help
This should be fine, but make sure this secondary process uses the threading mode, so that the async frameworks do not interfere:
socketio = SocketIO(message_queue='redis://', async_mode='threading')
Hi Miguel,
Just another question on this. Is it possible to bypass this intermediate message queue and second process altogether and instead directly subscribe to a Kafka topic on the Flask server? Maybe something equivalent to the below? (I know the below is syntactically not correct by looking at the source)
socketio = SocketIO(message_queue='kafka://, ...)
In your opinion, what is the best approach to this problem to avoid another message queue? Could it be similar to the the background_thread approach in the documentation for server side message emitting ? I have read through Kombu and I have realized it does not seem to support Kafka.
Thank you again for your generous support and this library
This is currently not an option that is available, but it is definitely possible.
If Kombu ever adds Kafka as one of its supported transports, then you would have the option to do this, without any changes to this project. Another alternative would be to add explicit support for Kafka in python-socketio. I have implemented Kombu and Redis as transports, and I have also accepted a ZeroMQ driver contributed by a user. If you want to look into adding Kafka yourself, I'm happy to provide guidance.
Sure I would give it a shot. I'm looking at the KombuManager right now. Do I just have to sub class PubSubManager and use the appropriate methods from python-kafka and override methods like _conneciton, _exchange, and _queue?
Right. All you need to do is provide your implementations of _publish() and _listen() in your subclass, plus any auxiliary methods you need. You may also need to do some conversion between a URL and the connection parameters python-kafka requires, if URLs aren't supported already. The redis driver is actually much simpler to start from, so take a look at that one too.
Ok I think I understand. I have an app already as you might have guessed, do you have any suggestions/best practices on how to replicate the environment? I'm a bit of a beginner contributing to python open source. Here's my guess: in my requirements.txt I would use something like this:
-e git+https://github.com/mrwillis/Flask-SocketIO.git#egg=Flask-SocketIO
-e git+https://github.com/mrwillis/python-socketio.git#egg=python-socketio
And then just branch and start hacking, right? When I'm confident, would I just add two PRs on Flask-SocketIO and python-socketio?
Thanks again
There isn't a single way to go about this. Here is how I would do it:
Create a virtualenv and then run the following to install your forks on it:
cd python-socketio
pip install -e .
cd ../flask-socketio
pip install -e .
Take the Flask-SocketIO example app (or your own app if you prefer) and make sure it works against Redis, for example. You'll need to add message_queue='redis://' to the SocketIO() constructor. And you will need to run a Redis instance on your machine. If the app is able to send and receive, then you are good.
kafka:// as a message queue URL.kafka:// URL.I hope this helps! Let me know if you also need help with the PR submission when you get to that point.
@miguelgrinberg Great thanks. It is clear now. I am making progress.
Is this issue resolved?
I am sending a message that is generated in the server side.
consumer.py
topicReply = 'botReply'
consumerReply = KafkaConsumer(topicReply, value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumerReply:
fromConsumer(message.value)
server.py
app = Flask(__name__)
socketio = SocketIO(app)
def fromConsumer(msg):
socketio.emit('reply', msg)
if __name__ == '__main__':
socketio.run(app)
Then in my client Javascript
var socket = io.connect('http://localhost:1000/');
socket.on('reply', function(msg) {
console.log('Received reply');
console.log(msg)
});
This should show me the received reply message in the browser javascript console but I don't see anything.
@nahidalam Kafka support has never been implemented, at least I have not received any contributions for that backend.
Edit: actually I'm wrong, I forgot that I have it in my queue to review https://github.com/miguelgrinberg/python-socketio/pull/181.
@miguelgrinberg Can you merge it if it has been reviewed, this is an useful feature
Looks like this has been merged and this can be closed.
https://github.com/miguelgrinberg/python-socketio/pull/181
Yeah, thanks.
Most helpful comment
This should be fine, but make sure this secondary process uses the threading mode, so that the async frameworks do not interfere: