It would be awesome to have a feature to enable broadcast messages to all connected clients at specific websocket endpoint.
This function could be executed inside a api endpoint, for example after a post method that changes some persistent data, all connected clients at /ws will receive a message telling that it is time to refresh.
Cheers guys, this is a incredible project. Like so much!
@rudmac If I understand your feature request properly, I think it is actually possible without too much effort by making use of an async generator. Admittedly, I still find async generators a little mind bending (this is the first time I've ever had a use for one "in the wild"), but I think seeing it in action makes it a little easier. Here's an implementation that makes a small extension of the tutorial app from https://fastapi.tiangolo.com/tutorial/websockets/ to add global push notifications:
from typing import List
from fastapi import FastAPI
from starlette.responses import HTMLResponse
from starlette.websockets import WebSocket, WebSocketDisconnect
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
class Notifier:
def __init__(self):
self.connections: List[WebSocket] = []
self.generator = self.get_notification_generator()
async def get_notification_generator(self):
while True:
message = yield
await self._notify(message)
async def push(self, msg: str):
await self.generator.asend(msg)
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.connections.append(websocket)
def remove(self, websocket: WebSocket):
self.connections.remove(websocket)
async def _notify(self, message: str):
living_connections = []
while len(self.connections) > 0:
# Looping like this is necessary in case a disconnection is handled
# during await websocket.send_text(message)
websocket = self.connections.pop()
await websocket.send_text(message)
living_connections.append(websocket)
self.connections = living_connections
notifier = Notifier()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await notifier.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect:
notifier.remove(websocket)
@app.get("/push/{message}")
async def push_to_connected_websockets(message: str):
await notifier.push(f"! Push notification: {message} !")
@app.on_event("startup")
async def startup():
# Prime the push notification generator
await notifier.generator.asend(None)
If you want to test it out, copy the above to a file main.py
and start the server:
uvicorn main:app --reload
Then, open a few tabs at http://localhost:8000/ and send some chat messages (this should work the same as the base tutorial app). Then open http://localhost:8000/push/hello%20world and you should receive a push notification in each of your open tabs showing the message hello world
.
@tiangolo I'm a really big fan of this websockets integration, truly awesome stuff.
Cheers guys, this is a incredible project. Like so much!
@rudmac I'm glad to hear that!
@dmontagu That's very interesting! And yes, it's mind-bending.
This should go in a blog post, a tutorial, a package or something :clap:
@dmontagu Very interesting approach. This is what I'm looking for. Thank you so much! I will let you know after implement this in my real application.
@dmontagu When I try this, I get the error: AttributeError: 'FastAPI' object has no attribute 'websocket'
on the @app.websocket("/ws") decorator. Any clues why?
I did implement you code @dmontagu... works like a charm. Now I can broadcast a message to all webclients that it´s time to refresh just after write some infos into database. The old code I was always looking for new data using queries, wasting resource....
Thank you bro! Cheers!
@SimonDahrs I would guess that you just need to upgrade FastAPI >=0.24.0 (pip install "fastapi>=0.24.0"
should do it). The websockets capability was only added less than a week ago!
@dmontagu Thanks, that did the trick! Didn't know it was that new, cool!
Great, thanks for all the help @dmontagu! ...I look forward to your blog-post :wink:
I think the question is now answered, so I'll close this issue now. But feel free to add more comments or create new issues.
just to mention, the very awesome @dmontagu solution works only with uvicorn. Anyone had the same problem as me?
For production gunicorn, with a bunch of _workers_, I did implement above the @dmontagu solution, a redis pub/sub message to inform all workers to broadcast a message to all websocket clients connected. I did that because when I was hitting the /push/{message}
, only the websocket clients connected with that worker receives the message.
I was already using redis, so with pub/sub did my day. Maybe without redis and a bit more code you can achieve the same result.
Cheers!
@rudmac Yes, you would need to modify this to work with multiple workers; it would probably have to involve each worker subscribing to some single master notification source (the handling of those notifications could also be done via async generators). Presumably redis (or similar) can handle this. But I doubt that it could be done without relying on an external service given that different workers will all be running in different processes.
Agreed, also, the more complex, distributed, Redis-based implementation deserves its own blog post. 😀🚀📝
@rudmac I've adapted @dmontagu's example to include a rabbitmq message broker. I _think_ this should solve the gunicorn problem since each instance will push messages to a globally maintained queue which is being consumed by the local notifier created on each instance.
Example here just git clone
and docker-compose up -d
to test.
Feedback would be great as this is the first time I've used rabbitmq... happy to write this up into a blog post? :smiley:
@dmontagu Thank you for sharing this solution, it is not broadcast but it also solves my need.
@dmontagu Thanks for this solution!
@tiangolo Maybe it could be mentioned in the official docs?
https://github.com/encode/broadcaster
Le lun. 2 mars 2020 à 3:09 PM, ivunchata notifications@github.com a
écrit :
@dmontagu https://github.com/dmontagu Thanks for this solution!
@tiangolo https://github.com/tiangolo Maybe it could be mentioned in
the official docs?—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/tiangolo/fastapi/issues/258?email_source=notifications&email_token=AAINSPR4CKHSR7AWIGRH3VLRFO43FA5CNFSM4HPTLXPKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOENPN5RA#issuecomment-593419972,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AAINSPQM3CRPTEGLGBKR6QTRFO43FANCNFSM4HPTLXPA
.
@dmontagu Thank you so much, you are a lifesaver! :) Your approach was awesome! It is truly amazing! thanks once again! I agree with @ivunchata, this could be in the doc. as well, for others.
@dmontagu Thank you soo much the implementation works like charm..!. A blog is necessary for this implementation.
connections: Dict[str, List[WebSocket]] = {}
class Notifier:
def __init__(self, room_id: str):
self.generator = self.get_notification_generator()
self.room_id = room_id
connections[room_id] = []
async def get_notification_generator(self):
while True:
message = yield
await self._notify(message)
async def push(self, msg: str):
await self.generator.asend(msg)
async def connect(self, websocket: WebSocket):
await websocket.accept()
connections[self.room_id].append(websocket)
def remove(self, websocket: WebSocket):
connections[self.room_id].remove(websocket)
async def _notify(self, message: str):
living_connections = []
while len(connections[self.room_id]) > 0:
# Looping like this is necessary in case a disconnection is handled
# during await websocket.send_text(message)
websocket = connections[self.room_id].pop()
await websocket.send_text(message)
living_connections.append(websocket)
connections[self.room_id] = living_connections
notifier = {
"1": Notifier("1"),
"2": Notifier("2")
}
@router.websocket("/ws/{room_id}")
async def websocket_endpoint(room_id: str, websocket: WebSocket):
logging.info(msg=f"room_id = {room_id}")
await notifier[room_id].connect(websocket)
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect:
notifier[room_id].remove(websocket)
@router.get("/push/{room_id}/{message}")
async def push_to_connected_websockets(room_id: str, message: str):
await notifier[room_id].push(f"! Push notification: {message} !")
Here is the error stack trace
backend_1 | ERROR:uvicorn:Exception in ASGI application
backend_1 | Traceback (most recent call last):
backend_1 | File "/usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py", line 368, in run_asgi
backend_1 | result = await app(self.scope, self.receive, self.send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/uvicorn/middleware/message_logger.py", line 58, in __call__
backend_1 | raise exc from None
backend_1 | File "/usr/local/lib/python3.7/site-packages/uvicorn/middleware/message_logger.py", line 54, in __call__
backend_1 | await self.app(scope, inner_receive, inner_send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/fastapi/applications.py", line 171, in __call__
backend_1 | await super().__call__(scope, receive, send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/applications.py", line 102, in __call__
backend_1 | await self.middleware_stack(scope, receive, send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/errors.py", line 181, in __call__
backend_1 | raise exc from None
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
backend_1 | await self.app(scope, receive, _send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/base.py", line 25, in __call__
backend_1 | response = await self.dispatch_func(request, self.call_next)
backend_1 | File "./app/main.py", line 34, in db_session_middleware
backend_1 | response = await call_next(request)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/base.py", line 45, in call_next
backend_1 | task.result()
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/base.py", line 38, in coro
backend_1 | await self.app(scope, receive, send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/cors.py", line 78, in __call__
backend_1 | await self.app(scope, receive, send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in __call__
backend_1 | raise exc from None
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
backend_1 | await self.app(scope, receive, sender)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 550, in __call__
backend_1 | await route.handle(scope, receive, send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 227, in handle
backend_1 | await self.app(scope, receive, send)
backend_1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
backend_1 | response = await func(request)
backend_1 | File "/usr/local/lib/python3.7/site-packages/fastapi/routing.py", line 199, in app
backend_1 | dependant=dependant, values=values, is_coroutine=is_coroutine
backend_1 | File "/usr/local/lib/python3.7/site-packages/fastapi/routing.py", line 149, in run_endpoint_function
backend_1 | return await dependant.call(**values)
backend_1 | File "./app/api/api_v1/endpoints/websockets.py", line 105, in push_to_connected_websockets
backend_1 | await notifier[room_id].push(f"! Push notification: {message} !")
backend_1 | File "./app/api/api_v1/endpoints/websockets.py", line 65, in push
backend_1 | await self.generator.asend(msg)
backend_1 | TypeError: can't send non-None value to a just-started async generator
backend_1 | DEBUG:uvicorn:('172.21.0.3', 39028) - Disconnected
@rsreevishal you might want to check https://github.com/encode/broadcaster
If that doesn't do it and you still have problems, please create a new issue following the template.
@rsreevishal you might want to check https://github.com/encode/broadcaster
If that doesn't do it and you still have problems, please create a new issue following the template.
okay thanks :+1:
Most helpful comment
@rudmac If I understand your feature request properly, I think it is actually possible without too much effort by making use of an async generator. Admittedly, I still find async generators a little mind bending (this is the first time I've ever had a use for one "in the wild"), but I think seeing it in action makes it a little easier. Here's an implementation that makes a small extension of the tutorial app from https://fastapi.tiangolo.com/tutorial/websockets/ to add global push notifications:
If you want to test it out, copy the above to a file
main.py
and start the server:Then, open a few tabs at http://localhost:8000/ and send some chat messages (this should work the same as the base tutorial app). Then open http://localhost:8000/push/hello%20world and you should receive a push notification in each of your open tabs showing the message
hello world
.@tiangolo I'm a really big fan of this websockets integration, truly awesome stuff.