Flask-socketio: Integration with Kafka

Created on 19 Jul 2017  路  13Comments  路  Source: miguelgrinberg/Flask-SocketIO

Hi Miguel,

Just quick question for you. Sorry for the potentially silly question below. Is it possible to emit socketio events from an external process using Kafka? More precisely, in some external process, have the below (as in the documentation)

socketio = SocketIO(message_queue='redis://')
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
    socketio.emit('my event, {'data': 'foo'}, namespace='/test')

where message_queue I guess would point to my own redis message queue that flask-socketio would use to communicate with my external process above. Would there be any threading issues I would have to consider here like monkey patching?

Thanks for your help

enhancement

Most helpful comment

This should be fine, but make sure this secondary process uses the threading mode, so that the async frameworks do not interfere:

socketio = SocketIO(message_queue='redis://', async_mode='threading')

All 13 comments

This should be fine, but make sure this secondary process uses the threading mode, so that the async frameworks do not interfere:

socketio = SocketIO(message_queue='redis://', async_mode='threading')

Hi Miguel,

Just another question on this. Is it possible to bypass this intermediate message queue and second process altogether and instead directly subscribe to a Kafka topic on the Flask server? Maybe something equivalent to the below? (I know the below is syntactically not correct by looking at the source)

socketio = SocketIO(message_queue='kafka://, ...)

In your opinion, what is the best approach to this problem to avoid another message queue? Could it be similar to the the background_thread approach in the documentation for server side message emitting ? I have read through Kombu and I have realized it does not seem to support Kafka.

Thank you again for your generous support and this library

This is currently not an option that is available, but it is definitely possible.

If Kombu ever adds Kafka as one of its supported transports, then you would have the option to do this, without any changes to this project. Another alternative would be to add explicit support for Kafka in python-socketio. I have implemented Kombu and Redis as transports, and I have also accepted a ZeroMQ driver contributed by a user. If you want to look into adding Kafka yourself, I'm happy to provide guidance.

Sure I would give it a shot. I'm looking at the KombuManager right now. Do I just have to sub class PubSubManager and use the appropriate methods from python-kafka and override methods like _conneciton, _exchange, and _queue?

Right. All you need to do is provide your implementations of _publish() and _listen() in your subclass, plus any auxiliary methods you need. You may also need to do some conversion between a URL and the connection parameters python-kafka requires, if URLs aren't supported already. The redis driver is actually much simpler to start from, so take a look at that one too.

Ok I think I understand. I have an app already as you might have guessed, do you have any suggestions/best practices on how to replicate the environment? I'm a bit of a beginner contributing to python open source. Here's my guess: in my requirements.txt I would use something like this:

-e git+https://github.com/mrwillis/Flask-SocketIO.git#egg=Flask-SocketIO
-e git+https://github.com/mrwillis/python-socketio.git#egg=python-socketio

And then just branch and start hacking, right? When I'm confident, would I just add two PRs on Flask-SocketIO and python-socketio?

Thanks again

There isn't a single way to go about this. Here is how I would do it:

  • fork python-socketio and Flask-SocketIO.
  • Clone your forks of the two projects.
  • Create a virtualenv and then run the following to install your forks on it:

    cd python-socketio
    pip install -e .
    cd ../flask-socketio
    pip install -e .
    
  • Take the Flask-SocketIO example app (or your own app if you prefer) and make sure it works against Redis, for example. You'll need to add message_queue='redis://' to the SocketIO() constructor. And you will need to run a Redis instance on your machine. If the app is able to send and receive, then you are good.

  • Copy the Redis driver in python-socketio to a kafka_manager.py, and rename the class in it to KafkaManager. Leave the Redis code in it still.
  • Now go to Flask-SocketIO and add support for kafka:// as a message queue URL.
  • Make sure you can get the Redis driver to work when using the new kafka:// URL.
  • Now delete all the redis code from the kafka module, and rewrite for Kafka. Use the example app to test.
  • When you are done, you will have two PRs to submit, one for each project.

I hope this helps! Let me know if you also need help with the PR submission when you get to that point.

@miguelgrinberg Great thanks. It is clear now. I am making progress.

Is this issue resolved?

I am sending a message that is generated in the server side.

consumer.py

topicReply = 'botReply'
consumerReply = KafkaConsumer(topicReply, value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumerReply:
    fromConsumer(message.value)

server.py

app = Flask(__name__)
socketio = SocketIO(app)

def fromConsumer(msg):
    socketio.emit('reply', msg)

if __name__ == '__main__':
    socketio.run(app)

Then in my client Javascript

var socket = io.connect('http://localhost:1000/');
socket.on('reply', function(msg) {
        console.log('Received reply');
    console.log(msg)
    });

This should show me the received reply message in the browser javascript console but I don't see anything.

@nahidalam Kafka support has never been implemented, at least I have not received any contributions for that backend.

Edit: actually I'm wrong, I forgot that I have it in my queue to review https://github.com/miguelgrinberg/python-socketio/pull/181.

@miguelgrinberg Can you merge it if it has been reviewed, this is an useful feature

Looks like this has been merged and this can be closed.
https://github.com/miguelgrinberg/python-socketio/pull/181

Yeah, thanks.

Was this page helpful?
0 / 5 - 0 ratings