MQTT is used often in IoT. It is much lighter than HTTP and is build on top of TCP. It can also use a WebSocket transport layer.
I can think of the following options and would be interested to hear what you think.
Flask has support for MQTT routes, so I could follow the instructions here to implement Flask middleware, https://fastapi.tiangolo.com/advanced/wsgi/#using-wsgimiddleware, however I don't know if this works in general for any protocol (HTTP, MQTT, WS etc.) of if it only routes HTTP.
RabbitMQ has support for MQTT. But I'm unsure how I would integrate this with FastAPI. It would not be appropriate to use a background task, https://fastapi.tiangolo.com/tutorial/background-tasks/ because these are tasks that fired off after returning a response. Probably I would need to use an asyncio.Queue and call API methods manually when an event arrives.
paho-mqtt can use WebSocket transport. Again I'm unsure how to integrate this with FastAPI. Could I use the @websocket
decorator. Probably not because that would just be the transport layer, not the protocol layer!
@app.websocket("/sent_data") # PUT called by external client to send data here
async def recieved_data_mqtt_over_ws():
pass
Eventually something like this might be possible?
@app.mqtt("/sent_data") # PUT called by external client to send data here
async def recieved_data_mqtt_over_ws():
pass
I have not yet started to write code for this part of the project yet, but it鈥檚 on the horizon and thought I would try and get some advice sooner rather than later.
Great question
Also interested in the answer.
Interesting, I hadn't heard of it, I don't know how that protocol works but it seems it's more similar to WebSockets than to HTTP.
The solution would be quite involved, and actually would end up not related to FastAPI :shrug: but anyway, here are my ideas:
There would have to be an ASGI sub-specification for MQTT as there's one for HTTP and for WebSockets, I haven't seen one, but I see there are some packages that try to implement a layer for that (you can search "asgi mqtt").
But then you would need some type of server that supports that protocol, so, probably not Uvicorn, Hypercorn, etc. Maybe one of those packages directly.
Then there would be some code that handles that communication, maybe as part of one of those packages or independently.
Most of those packages seem to target Django Channels, maybe using them would be the simplest approach to handle it.
If it was for WebSockets, you could probably mount Django Channels in a FastAPI app, as both are ASGI. But as you need a new, custom, protocol that is not supported by most ASGI servers, you would probably end up needing a custom solution.
@tiangolo thanks! I will think about this when I return to this topic in the future.
@danieljfarrell Sorry for posting into this closed issue. However some point to note w.r.t. 2. Use RabbitMQ:
RabbitMQ ATM does support MQTT3.1.1 only. mosquitto supports v5.0 as well. According to the GitHub stars and watches the user base of rabbitmq-server (broker) is probably smaller than the user base of e.g. mosquitto which is backed by Eclipse which has a lot of adopters of their IoT projects (not MQTT related, however to be considered w.r.t. integration).
W.r.t. 3. Use MQTT over Websocket: In my experience paho-mqtt
works pretty well Django + channels + daphne (under the hood: twisted, autobahn, asgiref).
How is the situation here? I want to connect MQTT with my FastAPI server.. Is this possible ?
I didn鈥檛 make progress on this. Will post here if I do.
I've only read this code quickly so please forgive if not accurate. How many event loops are there running? Ideally there should be one per thread if using asyncio. This also might account for things not getting called.
UPDATE:: I somehow made this work. I can both connect and send message to websocket server and receive data from it. I can also connect to the broker, and send message to it. Another client which is also connected to the broker can see the messages. However, I can not see messages sent from one client getting printed in the terminal(which i can when i run this locally). But we are one step closer to solve this 100%.
To make this work it is important to expose to two different ports, otherwise it will not work. In my case i have my websocket: -p 5080:80 and the MQTT: -p 5079:79:
docker run -d -e PYTHONUNBUFFERED=1 --name another_one -p 5080:80 -p 5079:79 fastapiserver
Full code::
import sys
from fastapi import FastAPI, WebSocket
from starlette.websockets import WebSocketDisconnect
from typing import List
import logging
import asyncio
from hbmqtt.broker import Broker
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1
logger = logging.getLogger(__name__)
config = {
"listeners": {"default": {"type": "tcp", "bind": "0.0.0.0:79"}},
"sys_interval": 10,
"topic-check": {
"enabled": True,
"plugins": ["topic_taboo"],
},
}
broker = Broker(config)
async def startBroker():
await broker.start()
async def brokerGetMessage():
C = MQTTClient()
await C.connect("mqtt://0.0.0.0:79/")
await C.subscribe([("detteSkalV忙reKode/test", QOS_1)])
logger.info("Subscribed!")
try:
for i in range(1, 100):
message = await C.deliver_message()
packet = message.publish_packet
print(packet.payload.data.decode("utf-8"))
except ClientException as ce:
logger.error("Client exception : %s" % ce)
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
app = FastAPI()
background_task_started = False
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
print("SOMEBODY CONNECTED")
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
@app.on_event("startup")
async def startup_event():
await broker.start()
@app.on_event("shutdown")
async def shutdown_event():
await broker.shutdown()
Hello there, I know the issue is closed, but we have good news, we have worked on it, created an extension for FastApi. FastApi-MQTT is now released with its features. It is a wrapper around gmqtt, we documented every single point. We think are progressed, but if you face any issues pls let me know. Feel free to open issues or submit PR.
fastapi-mqtt: https://github.com/sabuhish/fastapi-mqtt
Good job!
Does your MQTT server run in the same thread as FastAPI? Does it have access to the same run loop? Or is it more less running independently?
@danieljfarrell Exactly it is the same thread as FastAPI uses which is the main thread, we don't create a new event loop.
@danieljfarrell Updating my answer: only publish, unsubscribe methods are used in separate threads because there were blocking calls. Which would have blocked the main thread and prevented the event loop from running.
I see, could they be made coroutines for full async?
Hello there, I know the issue is closed, but we have good news, we have worked on it, created an extension for FastApi. FastApi- MQTT is now released with its features. It is a wrapper around gmqtt, we documented every single point. We think are progressed, but if you face any issues pls let me know. Feel free to open issues or submit PR.
fastapi-mqtt: https://github.com/sabuhish/fastapi-mqtt
@sabuhish Thanks a lot for fastapi-mqtt! May I ask why you've choosen gmqtt
as client lib? The library has not many contributors and according to github stars not many adopters. Are all features of MQTT v3.1.1 supported? Is LWT (last will and testament) supported? In case you don't know the feature here is a short "code level" intro. Sorry for beeing nitpicking but there is e.g. eclipse/paho.mqtt.python which has way more features.
I see, could they be made coroutines for full async?
@danieljfarrell I wish I could, I am following up with gmqtt, I hope to achieve this as well, but it is core was not coroutines. There is work in progress in the module itself not merged yet.
@sabuhish i think it's a difficult problem, to integrate the event loop with an existing non-async library. I see various attempts at asyncio + MQTT, with varying levels of success. Take a look here https://pypi.org/project/asyncio-mqtt/ and at the list of similar projects at the bottom. Maybe one of them might already have what you need?
Hello there, I know the issue is closed, but we have good news, we have worked on it, created an extension for FastApi. FastApi- MQTT is now released with its features. It is a wrapper around gmqtt, we documented every single point. We think are progressed, but if you face any issues pls let me know. Feel free to open issues or submit PR.
fastapi-mqtt: https://github.com/sabuhish/fastapi-mqtt@sabuhish Thanks a lot for fastapi-mqtt! May I ask why you've choosen
gmqtt
as client lib? The library has not many contributors and according to github stars not many adopters. Are all features of MQTT v3.1.1 supported? Is LWT (last will and testament) supported? In case you don't know the feature here is a short "code level" intro. Sorry for beeing nitpicking but there is e.g. eclipse/paho.mqtt.python which has way more features.
@fkromer Thank you too. Well, gmqtt library was one of the successful library among others for being an async client for MQTT. It has been updated till now, other libraries did not follow up latest development. Library that fully implements the MQTT protocol MQTTv311 and MQTTv50. Library paho is not async client which would not be good case for FastApi.
@sabuhish i think it's a difficult problem, to integrate the event loop with an existing non-async library. I see various attempts at asyncio + MQTT, with varying levels of success. Take a look here https://pypi.org/project/asyncio-mqtt/ and at the list of similar projects at the bottom. Maybe one of them might already have what you need?
cess.
@danieljfarrell As far as I remember before writing the module we discussed the possibility of this module as well but was a long time ago why skipped. I will take look at our notes, will be back.
@sabuhish I've not used paho
in async mode yet. But there should be support for async mode as well. At least there is connect_async()
and examples/loop_asyncio.py
.
@sabuhish i think it's a difficult problem, to integrate the event loop with an existing non-async library. I see various attempts at asyncio + MQTT, with varying levels of success. Take a look here https://pypi.org/project/asyncio-mqtt/ and at the list of similar projects at the bottom. Maybe one of them might already have what you need?
@danieljfarrell Sorry for being late. Thank you for your question, it took me back in the time when I first met with MQTT :) Last year in the summer, I got a project, data transfer from the web application to device and receiving data from it (STM-32). Paho is the most popular library which is written by Eclipse Foundation and Eclipse has many more libraries for MQTT across many languages. I needed to something in real-time with background scheduler tasks. When we got introduced to MQTT, we dived into many different libraries in python language for django framework. We analyzed paho, hbmqtt, aiomqtt, gmqtt, and others. Among them, we mostly emphasized on paho and tested worked fine. We developed django with paho, celery, and django-channles together. We were succeded. The reason why we chose paho, is because paho was up to date and maintained, had fewer issues in comparison. At that time I did really intend to dive more into asyncio, at the same I followed gmqtt itself to see what is happening. Paho was working quite nicely with more than 100 devices and receiving data in every 3 seconds from each device. Gmqtt was interesting because it had its own implementations and was built on the asyncio library it was in the early stages of development at that time. On the other hand, it was one of the Python libraries which early support MQTT 5.0. From the development team, Elena had talked about the client at Pycon Belarus. It is in the Russian language, unfortunately, I believe translation is possible. PyConBy: Elena Nikolaichik, MQTT with Python. Regarding to benchmarking results for clients, it showed up to the library was almost the same results with paho, and leaves behind others. Benchmarking. This year we took a look again at the libraries checked analyzed all possible solutions. There were around five libraries we took into account. We started with hmbqtt, realized has many issues, and outdated skipped it. Aiomqtt could not integrate paho's event loop into asyncio. Asyncio-mqtt was not reliable had some critics issues and it was based on an example on paho's repo. The reason why we were keen on it, because it was built on asyncio libray. Gmqtt was working already with fewer issues and maintained. As Elana said in production the library currently operates with million of payloads transferred between clients.
In regards to your answer, coroutines for full async:
Methods that we wrote does not need to internally have to be fully async if operations are not I/O bound and they are simple operations we don't require to make async. The main library that we use, is developed on top of asyncio. We also simplified it by writing the fastapi-mqtt. The reason for moving two methods into run_in_executer method is because for fastapi users could be easily used inside async-await functions.
@sabuhish I've not used
paho
in async mode yet. But there should be support for async mode as well. At least there isconnect_async()
andexamples/loop_asyncio.py
.
@fkromer I have noted this in the above.
@sabuhish wow how detailed, you have done a lot of great work getting it this far. And you are quiet right! Not everything needs to be async, just the methods which are IO bound. I think as long as it integrates with the main thread's event loop and is otherwise threadsafe. Good job! I think this is the case from what you wrote already.
@danieljfarrell Thank you for following up and make the discussion it was great to talk. With my teammates @Turall @AliyevH we will follow issues in case any, will try to do our best as much as possible.
Thanks @sabuhish . I had subscribed to this thread hoping for a solution to using MQTT with FastAPI :).
@mntolia We have achieved the solution, you can use it with FastApi. feel free to open issues, and discuss if you have any problems. fastapi-mqtt-issues
@mntolia We have achieved the solution, you can use it with FastApi. feel free to open issues, and discuss if you have any problems. fastapi-mqtt-issues
Hello @sabuhish, I am trying to put IOT events in my database.
I am using APIRouter and I have already made a route for inserting events in my DB and I am able to do it through the /docs link.
I am also able to connect to broker and subscribe to topic but I am not able to insert it in by DB.
I tried to use python requests but the API was not able to make POST request to itself.
I tried the code below and it says that : 'Depends' object has no attribute 'add'
@mqtt.on_message()
async def message(client, topic, payload, qos, properties, db :Session = Depends(get_db)):
data_list = payload.decode().split()
data = {'fduid':'FDU-'+topic[-6:],
'evnty':data_list[0],
'tstmp':data_list[1],
'locur':data_list[2]}
print(data)
return create_event_mqtt(db=db, event=data)
Event Create function ::
def create_event_mqtt(db :Session, event :event_schema.EventCreate):
db_event = Evntlg( fduid = event['fduid'], evnty = event['evnty'],
tstmp = event['tstmp'], locur = event['locur'] )
db.add(db_event)
db.commit()
db.refresh(db_event)
return db_event
Originally my idea was to import the mqtt object into the event_router file and use it in the create_event function.
But for some reason it says : Cannot import 'mqtt'
I am doing this for my final college project and I have to say, fastapi-mqtt came just in time for my API implementation. I just need a _little_ more help here, please.
::: UPDATE :::
I have figured a work-around, I've written a paho-mqtt client in another python file and that executes in another terminal.
So now I have fastapi in one and paho-mqtt in another terminal. The paho-mqtt script just takes the mqtt message and does a requests.post()
on the API link.
It works but it's kind of a hack. It would be so much better if I could catch the mqtt message and insert into database all in fastapi itself.
@wiserdivisor it will good idea to open an issue in the module itself rather than writing here. Subscribers of this thread will be annoyed getting mails and notifications plus if it is on the module others can also gain some practice instead of posting and discussing here being away from the module.
fastapi-mqtt-issues
Most helpful comment
Hello there, I know the issue is closed, but we have good news, we have worked on it, created an extension for FastApi. FastApi-MQTT is now released with its features. It is a wrapper around gmqtt, we documented every single point. We think are progressed, but if you face any issues pls let me know. Feel free to open issues or submit PR.
fastapi-mqtt: https://github.com/sabuhish/fastapi-mqtt