Fastapi: Why middleware waits for all scheduled tasks to finish?

Created on 23 Sep 2020  路  16Comments  路  Source: tiangolo/fastapi

Description

I schedule tasks to event loop like this:

    async def wait_to_finish():
        time.sleep(5)
        print('finished')

    @router.get("/async/background")
    async def get() -> bool:
        asyncio.create_task(wait_to_finish())
        return True

And it works well. True returned as response without waiting wait_to_finish() function to finish. And wait_to_finish() function runs in the event loop and prints "finished". Everything works well until here.

But when I add a middleware like this:

async def catch_exceptions_middleware(request: Request, call_next):
    try:
        return await call_next(request)
    except Exception as exc:
        logger.exception("Exception")
        return JSONResponse(status_code=getattr(exc, "status_code", 500),
                                    content={"message": f"Unexpected error occured."})

app.middleware('http')(catch_exceptions_middleware)

It waits to finish all tasks I created in the event loop before sending response. Thus it takes 5+ secs. to send response, in this example. Why middlewares force the event loop to finish all the tasks? How can I avoid this ? (Without using background-tasks. There are many reason I don't use it. And remember, this works pretty well until I add a middleware as shown above).

Environment

  • OS: macOS
  • FastAPI Version : 0.61.0
  • Python version: 3.8.5
question

All 16 comments

There's nothing much asynchronous work in your code, you are using time.sleep() when it is called, it will block the entire execution of your code not the coroutine that you ran, you can use asyncio.sleep() which is non-blocking and it will only block the execution of that coroutine.

Actually, it works when I remove the middleware. That means, time.sleep() runs asynchronous in background hence response send immediatelly. It's all about adding a new middleware here.

No, time.sleep() can't run asynchronously even if it is running inside a event loop, that is the most fundamental difference between sync and async, if there is no difference why do we need asyncio.sleep() ?

Here is an example for you.

import asyncio

import time


async def wait_to_finish_sync():
    print("starting sync")
    start = time.time()
    time.sleep(5)
    end = time.time()
    print(f"finished time.sleep() in {end-start }ms")


async def wait_to_finish_async():
    print("starting async")
    start = time.time()
    await asyncio.sleep(1)
    end = time.time()
    print(f"finished asyncio.sleep() in {end-start } ms")


async def main():
    await asyncio.gather(wait_to_finish_sync(), wait_to_finish_async())

asyncio.run(main())

main() runs wait_to_finish_sync one first then the wait_to_finish_async one

starting sync
finished time.sleep() in 5.003834247589111ms
starting async
finished asyncio.sleep() in 1.0015385150909424 ms

You can see it blocked the entire execution

But when we change the main and run the wait_to_finish_async first

async def main():
    await asyncio.gather(wait_to_finish_async(), wait_to_finish_sync())

You can see it runs the asyncio one then jumps in to next without waiting for it to finish, but when time.sleep() is called it blocks the entire execution again, both ends at the same time

starting async
starting sync
finished time.sleep() in 5.003143310546875ms
finished asyncio.sleep() in 5.003296613693237 ms

There is nothing wrong with adding a middleware, you are just running a blocking function in a coroutine, and keep in mind there is nothing called "background" in asyncio, yes they are working independently, but all of them still running inside an event loop, adding a blocking function will block event loop too.

Thank you for that naive explanation. But it is not my main concern. Here is an example pattern that I interested in:

import asyncio
import time


async def wait_to_finish_sync():
    print("starting sync")
    start = time.time()
    time.sleep(5)
    end = time.time()
    print(f"finished time.sleep() in {end-start }ms")


async def wait_to_finish_async():
    print("starting async")
    start = time.time()
    await asyncio.sleep(3)
    end = time.time()
    print(f"finished asyncio.sleep() in {end-start } ms")


async def main():
    task1 = asyncio.create_task(wait_to_finish_sync())
    task2 = asyncio.create_task(wait_to_finish_async())
    print("tasks scheduled immediatelly: ", task1, task2)

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

Check how print("tasks scheduled immediately: ", task1, task2) run by event loop immediately and tasks are running background even though time.sleep() blocks asyncio.sleep() as you described here. But it's not my concern here because I got what I want: the print("tasks scheduled immediately: ", task1, task2) executed immediately. One can easily use asyncio.sleep() rather than time.sleep() to prevent them block each other. But there is no difference in terms of scheduling them as tasks, to run in the background after returning from function (main here).

Again thanks for that good explanation.

Actually, I solved the issue by adding middleware as in here:

from fastapi.responses import JSONResponse
from loguru import logger


class GlobalExceptionHandlerMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        try:
            return await self.app(scope, receive, send)
        except Exception as ex:
            logger.exception("Exception")

            response = JSONResponse(status_code=getattr(ex, "status_code", 500),
                                    content={"message": "Unexpected error occured."})
            await response(scope, receive, send)

app = FastAPI(
    middleware=[Middleware(GlobalExceptionHandlerMiddleware)],
)

It runs as I described/wanted above.
I think it's the difference between HTTP middleware and ASGI middleware. HTTP middleware waits for all async jobs to be done.

Ah i see, but the case made me curious, as far as i see you are writing a middlware just for the handing 500, is there a specific reason for to use middleware instead of exception handler?

Think exception handler is mostly for 400 errors? And I feel like it would be weird to wrap the entire request with an exception handler.

@ArcLightSlavik yes we usually use exception handlers for Client errors (4xx), but that does not means server errors(5xx) should not be handled by an exception handler it is still an error and actually it should be handled by an exception handler not by a middleware,, Starlette handles it by exception handler, also Flask etc.

And I feel like it would be weird to wrap the entire request with an exception handler.

We do not need to wrap entire request? That is the point, middleware runs for entire requests, it is unnecessary and meaningless, again that is exception handlers job, when a server error raised, exception handler will handle it, writing a middleware to handle exception is super unefficient and the out of middlewares purpose.

That starlette solution looks neat. My concern was that you would have to modify every request (maybe add a decorator or something) but seems that's not required.

@ArcLightSlavik Isn't it the opposite? Because a middleware runs for every request, but a exception handler only run when a 500 is raised.

@ycd When running yes. But I meant in if you would require to add any code to the endpoints. I was thinking that if you used Exception handlers you would need to do something like this. (you don't was just thinking that)

@app.get('/request')
@handle_error_500

Ah i see, but the case made me curious, as far as i see you are writing a middlware just for the handing 500, is there a specific reason for to use middleware instead of exception handler?

I tried exception handlers and it still logs ERROR: Exception in ASGI application and I don't want this. And in debug mode, it doesn't work.

I use this middleware usually:

class ServerErrorMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next: Callable) -> Response:
        response = Response("Internal server error", status_code=500)
        try:
            response = await call_next(request)
        except Exception:
            logger.exception(f"Request {request.url} failed.")
        return response

@Kludex doesn't using a Middleware that inherits BaseHTTPMiddleware brokes the BackgroundTasks?

See Starlette Issues 919

That's pretty disappointing... Thank you @ycd

Was this page helpful?
0 / 5 - 0 ratings