Fastapi: [QUESTION] Integration with Faust (Python Streaming library)

Created on 25 Jul 2019  路  12Comments  路  Source: tiangolo/fastapi

Description

I am currently investigating possibilities on integration with Faust, and was wondering if anyone in the community already had an experience with setting up a simple fastAPI service with Faust integrated?

On a side note,
Great project, really hope it won't get abandoned/deprecated 馃挭

question

Most helpful comment

So the main problem I've encountered is that your FastAPI app (served by Uvicorn) and Faust app run independent event loops so any AGENT.send(...) you do in your FastAPI app will result in something like this

RuntimeError: Task ... running at ... got Future <Future pending> attached to a different loop

Right now I'm solving it by defining the following Uvicorn class in uvicorn/uvicorn/main.py

class Uvicorn:
    def __init__(self, app, **kwargs):
        self.app = app
        self.config = Config(app, **kwargs)
        self.server = Server(config=self.config)

        if self.config.reload and not isinstance(app, str):
            self.config.logger_instance.warn(
                "auto-reload only works when app is passed as an import string."
            )

    def get_event_loop(self):
        return self.server.get_event_loop()

    def run(self):
        if isinstance(self.app, str) and (self.config.debug or self.config.reload):
            sock = self.config.bind_socket()
            supervisor = StatReload(self.config)
            supervisor.run(self.server.run, sockets=[sock])
        elif self.config.workers > 1:
            sock = self.config.bind_socket()
            supervisor = Multiprocess(self.config)
            supervisor.run(self.server.run, sockets=[sock])
        else:
            self.server.run()

and then doing something like the following

from fastapi import FastAPI
from uvicorn import Uvicorn
from faust import App

app = FastAPI()
uvicorn = Uvicorn(app, ...)
faust_app = App(..., loop=uvicorn.get_event_loop())

if __name__ == '__main__':
    uvicorn.run()

Let me know if this helps!

All 12 comments

So the main problem I've encountered is that your FastAPI app (served by Uvicorn) and Faust app run independent event loops so any AGENT.send(...) you do in your FastAPI app will result in something like this

RuntimeError: Task ... running at ... got Future <Future pending> attached to a different loop

Right now I'm solving it by defining the following Uvicorn class in uvicorn/uvicorn/main.py

class Uvicorn:
    def __init__(self, app, **kwargs):
        self.app = app
        self.config = Config(app, **kwargs)
        self.server = Server(config=self.config)

        if self.config.reload and not isinstance(app, str):
            self.config.logger_instance.warn(
                "auto-reload only works when app is passed as an import string."
            )

    def get_event_loop(self):
        return self.server.get_event_loop()

    def run(self):
        if isinstance(self.app, str) and (self.config.debug or self.config.reload):
            sock = self.config.bind_socket()
            supervisor = StatReload(self.config)
            supervisor.run(self.server.run, sockets=[sock])
        elif self.config.workers > 1:
            sock = self.config.bind_socket()
            supervisor = Multiprocess(self.config)
            supervisor.run(self.server.run, sockets=[sock])
        else:
            self.server.run()

and then doing something like the following

from fastapi import FastAPI
from uvicorn import Uvicorn
from faust import App

app = FastAPI()
uvicorn = Uvicorn(app, ...)
faust_app = App(..., loop=uvicorn.get_event_loop())

if __name__ == '__main__':
    uvicorn.run()

Let me know if this helps!

Thanks for the response @gitavi !

Yes this is the main issue here and it seems like it still should be easy to have them on a single event loop since Faust supports uvloop and FastAPI is based on Uvicorn that is based on uvloop. Another option would to simply have them running separately as separate services.

Regarding your example,
Can u provide more details on how exactly do you start the uvicorn server?

Are you simply running something like:

python main.py

And this somehow starts up both FastAPI and Faust consumer, or you also start Faust separately but then they still share same event loop?

Here's a minimal example for you and we can figure out why it's not working together :)

requirements.txt

faust[fast,uvloop]
fastapi
-e git+https://github.com/gitavi/uvicorn.git@25960d12301339301482c299ee81937c889a1bb7#egg=uvicorn

main.py

from uvicorn import Uvicorn
from faust import App as Faust
from fastapi import FastAPI
import asyncio
import os
os.environ['PYTHONASYNCIODEBUG'] = '1'


fastapi_app = FastAPI()

uvicorn = Uvicorn(fastapi_app, host="127.0.0.1", port=5000)#, loop='asyncio')

faust_app = Faust(
    "presidents-app",
    broker="aiokafka://localhost:9092",
    loop=uvicorn.loop,
)

@fastapi_app.on_event('startup')
def start_faust_app():
    asyncio.ensure_future(faust_app.start(), loop=uvicorn.loop)


if __name__ == '__main__':
    uvicorn.run()

So running this you will notice that you get RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one if you run with Uvicorn's default loop uvloop or asyncio via the loop argument. But these errors are only being spit out because we set os.environ['PYTHONASYNCIODEBUG'] = '1'; if you don't set this, both loops seem to work fine but whatever is happening is clearly not thread-safe.

I think the underlying issue is on the Faust side and has something to do with the AIOKakfaConsumer not using the loop given to the Faust app at initialization but I haven't dug into it yet.

Let me know what you think.

@gitavi thank you for the example!

I agree with the assumption that It might be an issue on Faust codebase side.

I have opened similar issue on their repository as well and referenced your examples, let's wait to see some responses from their community :-)

So I brought this up in the Faust slack around a week ago and it turns out that Faust is thread safe despite this warning so this example should get everyone going on their FastAPI/Faust projects :).

Also I actually haven't tried doing something like asyncio.ensure_future(faust_app.start(), loop=asyncio.get_event_loop()) within the @fastapi_app.on_event('startup') function since I'm not sure if Uvicorn has started the loop at that point or not; but if it has then that would remove the need to have a portable Uvicorn class that saves the loop Faust to access (my personal Uvicorn fork has this portable class if you don't want to implement it yourself).

@gitavi how should we process the issue, is there a PR opened with your changes to the main repo?

If you can make the PR, I can also reference it in Faust repository and make a PR with proposal on adding the simple example on integration between Faust and FastAPI in Faust docs.

@gitavi , any updates on this ? 馃檪

@gitavi I have opened a PR with your changes on Uvicorn repository - https://github.com/encode/uvicorn/pull/442
Let's see what maintainers will suggest to change.

Thanks everyone for the discussion here!

So, Faust is a data processing system, I'm assuming that what you want to achieve is to be able to receive requests in your API built with FastAPI and from them send jobs to Kafka to be executed by Faust workers. In that case, those Faust workers should run as separate processes. Probably even in a distributed cluster of machines, not in the same process as your FastAPI app.

If you run it in the same process as your FastAPI app, you are not really gaining anything from Faust. In that case you would probably be better off with the integrated background tasks that are simpler to handle.

But if you have a heavy workload and you need to distribute the data processing, you should run them separately.

Faust looks great, but I don't see anything in their docs about _sending_ the messages/jobs to Kafka other than the command line. So you would probably need to add a Kafka Python client, to _send_ data to Kafka (not to read from it as a worker).

But in that case, if your use case is not as heavy as to require a full Kafka setup, it might be simpler/easier to just use ARQ with Redis.

Assuming the original issue was solved, it will be automatically closed now. But feel free to add more comments or create new issues.

Faust uses sinks to write to other kafka topics, you can specify a list of sinks in the annotation for app agents. btw, great work on fastapi!

Faust actually does have multiple ways to send messages to kafka programmatically using python. You can check their docs, or look at this example I wrote up here: https://github.com/toh995/fastapi-faust-example

The README for the repo has a high level explanation of the architecture, but basically I created separate docker containers for the faust worker and the API.

Comments and criticism are always welcome!

Was this page helpful?
0 / 5 - 0 ratings