'FastAPI asynchronous background tasks blocks other requests?
I want to run a simple background task in FastAPI, which involves some computation before dumping it into the database. However, the computation would block it from receiving any more requests.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
db = Database()
async def task(data):
otherdata = await db.fetch("some sql")
newdata = somelongcomputation(data,otherdata) # this blocks other requests
await db.execute("some sql",newdata)
@app.post("/profile")
async def profile(data: Data, background_tasks: BackgroundTasks):
background_tasks.add_task(task, data)
return {}
What is the best way to solve this issue?
Solution 1:[1]
Your task is defined as async, which means fastapi (or rather starlette) will run it in the asyncio event loop.
And because somelongcomputation is synchronous (i.e. not waiting on some IO, but doing computation) it will block the event loop as long as it is running.
I see a few ways of solving this:
Use more workers (e.g.
uvicorn main:app --workers 4). This will allow up to 4somelongcomputationin parallel.Rewrite your task to not be
async(i.e. define it asdef task(data): ...etc). Then starlette will run it in a separate thread.Use
fastapi.concurrency.run_in_threadpool, which will also run it in a separate thread. Like so:from fastapi.concurrency import run_in_threadpool async def task(data): otherdata = await db.fetch("some sql") newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata)) await db.execute("some sql", newdata)- Or use
asyncios'srun_in_executordirectly (whichrun_in_threadpooluses under the hood):
You could even pass in aimport asyncio async def task(data): otherdata = await db.fetch("some sql") loop = asyncio.get_running_loop() newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata)) await db.execute("some sql", newdata)concurrent.futures.ProcessPoolExecutoras the first argument torun_in_threadpoolto run it in a separate process.
- Or use
Spawn a separate thread / process yourself. E.g. using
concurrent.futures.Use something more heavy-handed like celery. (Also mentioned in the fastapi docs here).
Solution 2:[2]
Read this issue.
Also in the example below, my_model.function_b could be any blocking function or process.
TL;DR
from starlette.concurrency import run_in_threadpool
@app.get("/long_answer")
async def long_answer():
rst = await run_in_threadpool(my_model.function_b, arg_1, arg_2)
return rst
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Zhivar Sourati |
