Description
How can I [...]?
Is it possible to [...]?
Additional context
Add any other context or screenshots about the feature request here.
it's not strictly speaking a FastAPI question but here are some options, find what works best for you
the best without any doubt :grin: https://linux.die.net/man/5/crontab
some python tools >
https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
https://docs.python.org/3/library/sched.html
https://github.com/dbader/schedule
https://github.com/Bogdanp/dramatiq/tree/master/examples/scheduling
yes those methods I know but it is not working with an async function
https://github.com/aio-libs/aiojobs maybe ? never tried it
APScheduler has an asyncio implementation https://apscheduler.readthedocs.io/en/latest/
Example in their git https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/asyncio_.py
hey I am able to run background task but my unicorn server has stopped..how can I run both servers simultaneously
Thus far, the best task queue to use w/ FastAPI has been @samuelcolvin's Arq. It's really good. You should use it.
https://github.com/samuelcolvin/arq
He's also the guy behind Pydantic.
Thanks for all the help @euri10, @steinitzu, @leosussan!
@nishtha03 you can try the project generation templates, those include Celery and how to set everything up with Docker.
But if you can implement it yourself, I would definitely try Leo's suggestion and try Arq. I just haven't been able to try it myself, but it seems great. And Arq's architecture is very well thought, it allows you to distribute tasks and workers without requiring you to install all the dependencies everywhere.
@tiangolo: Celery looks nice, though I notice how it does not support Windows (which wouldn't normally bother me, but you can't always decide what your application is going to be deployed on). Having something that can integrate with the web server to some degree (unlike cronjobs, systemd timers and Windows Scheduled Tasks) has some degree of conveinience if, say, some database is meant to be populated daily by a scheduled task and the server needs to be able to tell whether the task has already been run or not.
ARQ and APScheduler both look promising, though (thanks everyone for the reccomendations); it might be worth giving them a mention in the docs. I feel like saying "Just use Celery if the BackgroundTask
s don't cut it for you" is a pretty tall step to climb and misses a lot of the intermediate solutions mentionned here.
Good point about Celery for Windows @sm-Fifteen .
Well, in fact, I have been wanting to try ARQ, and replace Celery in the project generators with it. And then suggest ARQ everywhere, in the end, it uses Pydantic, so it would probably be the best match for FastAPI too... :tada:
I first have to try it out, and I wanted to suggest (in a PR) to handle normal def
functions in a threadpool automatically (as it's done in FastAPI), that could help making it easier to adopt (and similar).
As ARQ is based on names (strings) instead of the function objects, it's the perfect match for modern distributed systems (e.g. Docker containers) that don't need to have everything installed in all the (possibly distributed) components. That's a huge advantage, for example when compared to the default model in Celery, RQ, Dramatiq, Huey. :sparkles:
The other "important" feature from other tools is an admin/monitor UI. As Celery's Flower or RQ's rq-dashboard
. But maybe we can build a FastAPI with a frontend for that :smile: :man_shrugging:
Hopefully, I'll be able to try all that soon :crossed_fingers:
@nishtha03 did you solve your problem?
To add to this - I added Arq to my FastAPI project template, incase y'all want an example to look at: https://github.com/leosussan/fastapi-gino-arq-uvicorn
https://github.com/samuelcolvin/arq/issues/120
Very happy to consider a monitor, but personally I'd recommend this takes the form of an API that existing monitoring tools can use; perhaps with a simple dashboard.
I found a package fastapi-utils supporting repeated tasks
Thanks for the discussion here everyone!
Does that solve your question @nishtha03 ? May we close this issue?
@tiangolo I'm giving a try
To add to this - I added Arq to my FastAPI project template, incase y'all want an example to look at: https://github.com/leosussan/fastapi-gino-arq-uvicorn
I'm giving it a try now...Thanks for contribution.
Thanks for the discussion here everyone!
Does that solve your question @nishtha03 ? May we close this issue?
This topic discussion is damn important. Working on the solution given by @leosussan . Probably, I'll bump here again. In case of any issue :)
I realize I can run an async future with
app = FastAPI(title='TestAPI', on_startup=async_func())
How can I add a non-async blocking function to the worker from there?
I just want to run a couple of batch files every night and process the output.
subprocess.run('my_file.exe')
will block for some time so I can't call it directly.
I realize I can run an async future with
app = FastAPI(title='TestAPI', on_startup=async_func())
How can I add a non-async blocking function to the worker from there?
I just want to run a couple of batch files every night and process the output.subprocess.run('my_file.exe')
will block for some time so I can't call it directly.
See subprocess.popen instead of subprocess.run
It I want to process the result it will block, too.
I was looking for a way to run a function in the threadpool or however it is implemented in FastAPI
@spacemanspiff2007 You can run a function in the same thread pool used by Starlette/FastAPI using fastapi_utils.tasks.repeat_every by @dmontagu.
With this example.py
import logging
import time
from fastapi import FastAPI
from fastapi_utils.tasks import repeat_every
logger = logging.getLogger(__name__)
app = FastAPI()
counter = 0
@app.get('/')
def hello():
return 'Hello'
@app.on_event("startup")
@repeat_every(seconds=1, logger=logger, wait_first=True)
def periodic():
global counter
print('counter is', counter)
counter += 1
you get
$ uvicorn example:app
INFO: Started server process [85016]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
counter is 0
counter is 1
counter is 2
counter is 3
The implementation is quite simple (source here): it runs the given function periodically, using Starlette's starlette.concurrency.run_in_threadpool
. This is precisely how Starlette and FastAPI run background tasks (source here).
But, since this is the same thread pool used to serve requests (of endpoints defined with def
), if your task is heavy you may want to run it in a separate process. For example,
subprocess.Popen
and periodically check its status from FastAPI's thread pool using repeat_every
(this could become messy when you have many tasks to check upon);crond
should do.Thank you very much for this detailed answer!
starlette.concurrency.run_in_threadpool
will definitely do the job!
@tiangolo I'd recommend to consider integrating the Advanced Python Scheduler. In my experience it offers a lot of features and is very stable/works very well. It offers a lot of schedulers, job stores, executors and triggers, has support for event listeners and is extensible. Consequently APScheduler is the go-to integration for flask flask-apscheduler and django django-apscheduler as well. The test coverage of 94% is very good as well.
I agree with @fkromer
I have bein using APScheduler for scheduled tasks.
There are really good features such as being able to see when the next schedule runs, cancelling schedules without rebooting the FastAPI Process.
I have been pinning certain endpoints to the schedule and fixing the IDs so it doesn't get scheduled twice.
This can be updated to be a query param or route.
Also it would be worthwhile persisting the schedules in some way as every time the FastAPI service restarts it doesn't automatically pick it up.
Here is some basic boostrap code.
Create Schedule Object:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
Schedule = AsyncIOScheduler()
Schedule.start()
View All Scheduled Jobs:
@app.get("/schedule/show_schedules/",tags=["schedule"])
async def get_scheduled_syncs():
schedules = []
for job in Schedule.get_jobs():
schedules.append({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
return schedules
Add A Scheduled Job
@app.post("/schedule/schedule1/",tags=["schedule"])
async def add_chat_job_to_scheduler(time_in_seconds:int=300):
schedule1_job = Schedule.add_job(schedule1_function, 'interval', seconds=time_in_seconds,id="schedule1")
return {"Scheduled":True,"JobID":schedule1_job.id}
@app.post("/schedule/schedule2/",tags=["schedule"])
async def add_schedule2(time_in_hours:int=24):
schedule2_job = Schedule.add_job(schedule2_function,, 'interval', hours=time_in_hours,id="schedule2")
return {"Scheduled":True,"JobID":schedule2_job.id}
Delete a scheduled Job
@app.delete("/schedule/schedule1/",tags=["schedule"])
async def remove_schedule1():
Schedule.remove_job("schedule1")
return {"Scheduled":False,"JobID":"schedule1"}
@app.delete("/schedule/schedule2/",tags=["schedule"])
async def remove_schedule2(time_in_hours:int=24):
Schedule.remove_job("schedule2")
return {"Scheduled":False,"JobID":"schedule2"}
I was using the schedule functionality to do SSL Checks. The article and code can be found here. https://ahaw021.medium.com/scheduled-jobs-with-fastapi-and-apscheduler-5a4c50580b0e. Hopefully it helps others with similar problems.
Most helpful comment
I found a package fastapi-utils supporting repeated tasks