Sanic: document Server-Sent Events (SSE)/EventSource

Created on 3 May 2017  路  13Comments  路  Source: sanic-org/sanic

I've been trying to figure out the best way to do SSE using Sanic, and was wondering if that's on the roadmap somehow. I've been successful with aiohttp, but cannot see a clear way to hold on to client connections.

needs investigation

Most helpful comment

I think it will work.
I used FireFox.
It is using Streaming.

from sanic import Sanic
from sanic.response import file, stream
import asyncio
from sanic.server import HttpProtocol

app = Sanic(__name__)


class CustomHttpProtocol(HttpProtocol):

    def on_message_complete(self):
        if self.url == b'/sse':
            self.request_timeout = 1000
        super().on_message_complete()


@app.route('/')
async def index(request):
    return await file('index.html')


@app.route('/sse')
async def sse(request):
    async def sample_streaming_fn(response):
        i = 1 
        while True:
            await asyncio.sleep(1)
            s = 'data: ' + str(i) + '\r\n\r\n'
            response.write(s.encode())
            i += 1
    return stream(sample_streaming_fn, content_type='text/event-stream')

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=8000, protocol=CustomHttpProtocol)

index.html

<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8" />
        <title>Server Sent Events</title>
    </head>
    <body>
        <div id="time">
            Time
        </div>
        <script>
            let eventSource = new EventSource('/sse');
            eventSource.addEventListener('message', function(e) {
                console.log(e.data)
                document.getElementById('time').textContent = e.data;
            }, false);
        </script>
    </body>
</html>

All 13 comments

I think it will work.
I used FireFox.
It is using Streaming.

from sanic import Sanic
from sanic.response import file, stream
import asyncio
from sanic.server import HttpProtocol

app = Sanic(__name__)


class CustomHttpProtocol(HttpProtocol):

    def on_message_complete(self):
        if self.url == b'/sse':
            self.request_timeout = 1000
        super().on_message_complete()


@app.route('/')
async def index(request):
    return await file('index.html')


@app.route('/sse')
async def sse(request):
    async def sample_streaming_fn(response):
        i = 1 
        while True:
            await asyncio.sleep(1)
            s = 'data: ' + str(i) + '\r\n\r\n'
            response.write(s.encode())
            i += 1
    return stream(sample_streaming_fn, content_type='text/event-stream')

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=8000, protocol=CustomHttpProtocol)

index.html

<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8" />
        <title>Server Sent Events</title>
    </head>
    <body>
        <div id="time">
            Time
        </div>
        <script>
            let eventSource = new EventSource('/sse');
            eventSource.addEventListener('message', function(e) {
                console.log(e.data)
                document.getElementById('time').textContent = e.data;
            }, false);
        </script>
    </body>
</html>

Let's keep this open, since we should probably document it. Thanks @38elements for the example.

Hey there. Is this still valid for 0.6+? I can't get it to work.

@rcarmo using Sanic 0.7.0 in here and working flawlessly. Can you provide more information on what's not working?

I moved on in the meantime and used another solution, but I tried the example and it now works on a fresh install. No way to reproduce the original situation...

@rcarmo oh, ok! Case closed, then :smile:

Well, there is this one thing... What happens if I get a RequestTimeout or ConnectionClosed during streaming? How do I clean up, say, database connections or internal message broker handles that I open inside sample_streaming_fn?

@rcarmo interesting question. I think a simple try/except would suffice ... Here's the modified route:

@app.route('/sse')
async def sse(request):
    async def sample_streaming_fn(response):
        try:
            print('starting ...')
            i = 1
            while True:
                await asyncio.sleep(1)
                s = 'data: ' + str(i) + '\r\n\r\n'
                response.write(s.encode())
                i += 1
        except Exception as e:
            print('gone!')
            print(e)
            print(e.__class__)
            print('')
    return stream(sample_streaming_fn, content_type='text/event-stream')

And here's the output after two reloads in the browser:

$ python testsse.py
[2017-12-13 14:48:05 -0200] [18339] [INFO] Goin' Fast @ http://127.0.0.1:8000
[2017-12-13 14:48:05 -0200] [18339] [INFO] Starting worker [18339]
[2017-12-13 14:48:06 -0200] - (sanic.access)[INFO][1:2]: GET http://127.0.0.1:8000/  200 494
starting ...
[2017-12-13 14:48:18 -0200] - (sanic.access)[INFO][1:2]: GET http://127.0.0.1:8000/  200 494
starting ...
gone!
unable to perform operation on <TCPTransport closed=True reading=False 0x5617167918e8>; the handler is closed
<class 'RuntimeError'>

[2017-12-13 14:48:31 -0200] - (sanic.access)[INFO][1:2]: GET http://127.0.0.1:8000/  200 494
starting ...
gone!
unable to perform operation on <TCPTransport closed=True reading=False 0x5617167288d8>; the handler is closed
<class 'RuntimeError'>

gone!
unable to perform operation on <TCPTransport closed=True reading=False 0x5617167295e8>; the handler is closed
<class 'RuntimeError'>

^C[2017-12-13 14:48:41 -0200] [18339] [INFO] Stopping worker [18339]
[2017-12-13 14:48:41 -0200] [18339] [INFO] Server Stopped

The 'gones' matches with the 'starts', so ... I hope this helps :wink:

Seems to work, wrapping only response.write, which is what blows up when it finds the connection is closed.

This pattern is useful when you subscribe to an internal pub/sub topic before entering the while. I鈥檓 wrapping response.write and then unsubscribing and breaking out of the loop on except.

Hi, I've also been trying to implement this and I am seeing a different behavior with that same example code above (with v0.7.0). I can get a stream established easily, but when the browser closes the socket no actual exception is thrown. Instead, every time it writes, I get on the console: socket.send() raised exception. but no actual exception is thrown that I can catch to break out of the loop. Any thoughts?

@karantza do you have an example of your code with this behavior, even if stripped down?

This problem seems to have stagnated. I'm going to close for now. We'll need to open a new ticket if it comes back up.

I found that SSE only worked when nginx was configured like so:

In particular, proxy buffering had to be off.

    location /sse/ {
        # sse won't work when buffering is on
        proxy_buffering off;
        chunked_transfer_encoding off;
        add_header 'Access-Control-Allow-Origin' 'https://www.example.org';
        add_header 'Access-Control-Allow-Credentials' 'true';
        proxy_pass http://127.0.0.1:8080;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_read_timeout 86400s;
        access_log off;
    }
Was this page helpful?
0 / 5 - 0 ratings