'Async SqlAlchemy with FastAPI: Getting single session for all requests
I have recently migrated a REST API coded with FastApi to the new SQLAlchemy 1.4+ Async Version. My app compiles correctly and the database seems to setup just fine. The problem appears when I try to execute multiple requests, an error appears that seems to indicate that the same session is being used for all my requests. I've put the Error message at the end
Here's my code, I've based myself on the SQLAlchemy Async Docs and on this example
App Engine initialization
from typing import AsyncIterator
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from .notification import Notification
from .devices import Device
from sqlalchemy import MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import create_engine
from app.core.config import Config
from app.core.constants import logger
import asyncio
engine = create_async_engine(Config.RDS_DB_URL)
metadata = MetaData(engine)
#if not engine.dialect.has_table(engine, C.NOTIFICATIONS):
#Base.metadata.create_all(engine)
async def init_connection():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
asyncio.run(init_connection())
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async def get_session() -> AsyncIterator[AsyncSession]:
async with async_session() as session:
yield session
Endpoints using the session depends loading
@registry_router.get(R.REGISTRY_PRESCRIPTION_ID, tags=[TAGS.REGISTRY])
async def get_patient_prescription_by_id(id: str, x_authorization: str = Header(None),
session: AsyncSession = Depends(get_session)):
#Other code
return (await session.execute(select(Prescription).where(Prescription.id==id,
Prescription.customerId==customerId))).scalars().first()
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/hypercorn/asyncio/context.py", line 28, in _handle
await invoke_asgi(app, scope, receive, send)
File "/usr/local/lib/python3.7/dist-packages/hypercorn/utils.py", line 219, in invoke_asgi
await app(scope, receive, send)
File "/usr/local/lib/python3.7/dist-packages/fastapi/applications.py", line 208, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.7/dist-packages/starlette/applications.py", line 112, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc
File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/cors.py", line 84, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.7/dist-packages/starlette/exceptions.py", line 82, in __call__
raise exc
File "/usr/local/lib/python3.7/dist-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 656, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 259, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 61, in app
response = await func(request)
File "/usr/local/lib/python3.7/dist-packages/fastapi/routing.py", line 227, in app
dependant=dependant, values=values, is_coroutine=is_coroutine
File "/usr/local/lib/python3.7/dist-packages/fastapi/routing.py", line 159, in run_endpoint_function
return await dependant.call(**values)
File "/home/ubuntu/rest/app/api/registry.py", line 504, in get_entity_schedules
for row in (await data).items:
File "/usr/local/lib/python3.7/dist-packages/fastapi_pagination/ext/async_sqlalchemy.py", line 23, in paginate
total = await session.scalar(select(func.count()).select_from(query.subquery())) # type: ignore
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/ext/asyncio/session.py", line 230, in scalar
**kw
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/ext/asyncio/session.py", line 206, in execute
**kw
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/session.py", line 1689, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1611, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/sql/elements.py", line 324, in _execute_on_connection
self, multiparams, params, execution_options
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1488, in _execute_clauseelement
cache_hit=cache_hit,
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1843, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 2024, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1800, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 717, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 451, in execute
self._prepare_and_execute(operation, parameters)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 70, in await_only
return current.driver.switch(awaitable)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 123, in greenlet_spawn
value = await result
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 379, in _prepare_and_execute
await adapt_connection._start_transaction()
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 687, in _start_transaction
self._handle_exception(error)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 653, in _handle_exception
raise translated_error from error
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another operation is in progress
[SQL: SELECT count(*) AS count_1
FROM (SELECT schedules."profileId" AS "profileId", schedules."customerId" AS "customerId", schedules."practitionerId" AS "practitionerId", schedules.practitioner AS practitioner, schedules.patient AS patient, schedules.id AS id, schedules.reason AS reason, schedules.comments AS comments, schedules.state AS state, schedules.duration AS duration, schedules.time AS time, schedules.ts AS ts, schedules.responses AS responses, schedules."attentionType" AS "attentionType", schedules.history AS history, tb_user.specialty AS specialty
FROM schedules LEFT OUTER JOIN tb_user ON CAST(tb_user.id AS VARCHAR) = schedules."practitionerId"
WHERE (schedules."practitionerId" = %s OR (EXISTS (SELECT 1
FROM participants, tb_user
WHERE schedules.id = participants."scheduleId" AND tb_user.id = participants."participantId" AND tb_user.id = %s))) AND schedules."customerId" = %s AND schedules.ts BETWEEN %s AND %s ORDER BY schedules.ts DESC) AS anon_1]
[parameters: ('c42a1400-4534-11eb-8918-eb0d5241f5a7', 'c42a1400-4534-11eb-8918-eb0d5241f5a7', '8f69db20-4533-11eb-8918-eb0d5241f5a7', 1632970800000, 1635735599000)]
(Background on this error at: https://sqlalche.me/e/14/rvf5)
I've tried to get the session to run on each request in a variety of ways with no success. Any clues on what I'm missing?
Solution 1:[1]
Late to answer but it might help others
You'll have to create a session variable within your generator function, like below
async_session = sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession
)
async def get_session() -> AsyncIterator[AsyncSession]:
async with async_session() as session:
yield session
This way there is a new session created for each call on get_session()
you can also refer the below article for a better understanding and setup
Solution 2:[2]
When two requests come in parallel, and when the first request awaits at some DB operation, it hands over the control to the second request, and the second request wants to do so DB operation as well, it will find there is someone using the DB connection, the error occurs.
I solved this issue by using async_scoped_session
async_session = async_scoped_session(
sessionmaker(
engine,
class_=AsyncSession,
),
scopefunc=current_task,
)
see the document
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 |