'Async SqlAlchemy with FastAPI: Getting single session for all requests

I have recently migrated a REST API coded with FastApi to the new SQLAlchemy 1.4+ Async Version. My app compiles correctly and the database seems to setup just fine. The problem appears when I try to execute multiple requests, an error appears that seems to indicate that the same session is being used for all my requests. I've put the Error message at the end

Here's my code, I've based myself on the SQLAlchemy Async Docs and on this example

App Engine initialization

from typing import AsyncIterator
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

from .notification import Notification
from .devices import Device



from sqlalchemy import MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import create_engine

from app.core.config import Config
from app.core.constants import logger
import asyncio

engine = create_async_engine(Config.RDS_DB_URL)
metadata = MetaData(engine)

#if not engine.dialect.has_table(engine, C.NOTIFICATIONS):
#Base.metadata.create_all(engine)

async def init_connection():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    

asyncio.run(init_connection())

async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

async def get_session() -> AsyncIterator[AsyncSession]:
    async with async_session() as session:
        yield session




Endpoints using the session depends loading

@registry_router.get(R.REGISTRY_PRESCRIPTION_ID, tags=[TAGS.REGISTRY])
async def get_patient_prescription_by_id(id: str, x_authorization: str = Header(None), 
                session: AsyncSession = Depends(get_session)):
    #Other code
    return (await session.execute(select(Prescription).where(Prescription.id==id, 
                Prescription.customerId==customerId))).scalars().first()

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/hypercorn/asyncio/context.py", line 28, in _handle
    await invoke_asgi(app, scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/hypercorn/utils.py", line 219, in invoke_asgi
    await app(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/fastapi/applications.py", line 208, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc
  File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/exceptions.py", line 82, in __call__
    raise exc
  File "/usr/local/lib/python3.7/dist-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 656, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 259, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.7/dist-packages/starlette/routing.py", line 61, in app
    response = await func(request)
  File "/usr/local/lib/python3.7/dist-packages/fastapi/routing.py", line 227, in app
    dependant=dependant, values=values, is_coroutine=is_coroutine
  File "/usr/local/lib/python3.7/dist-packages/fastapi/routing.py", line 159, in run_endpoint_function
    return await dependant.call(**values)
  File "/home/ubuntu/rest/app/api/registry.py", line 504, in get_entity_schedules
    for row in (await data).items:
  File "/usr/local/lib/python3.7/dist-packages/fastapi_pagination/ext/async_sqlalchemy.py", line 23, in paginate
    total = await session.scalar(select(func.count()).select_from(query.subquery()))  # type: ignore
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/ext/asyncio/session.py", line 230, in scalar
    **kw
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/ext/asyncio/session.py", line 206, in execute
    **kw
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/session.py", line 1689, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1611, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/sql/elements.py", line 324, in _execute_on_connection
    self, multiparams, params, execution_options
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1488, in _execute_clauseelement
    cache_hit=cache_hit,
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1843, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 2024, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1800, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 717, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 451, in execute
    self._prepare_and_execute(operation, parameters)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 70, in await_only
    return current.driver.switch(awaitable)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 123, in greenlet_spawn
    value = await result
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 379, in _prepare_and_execute
    await adapt_connection._start_transaction()
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 687, in _start_transaction
    self._handle_exception(error)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 653, in _handle_exception
    raise translated_error from error
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another operation is in progress
[SQL: SELECT count(*) AS count_1
FROM (SELECT schedules."profileId" AS "profileId", schedules."customerId" AS "customerId", schedules."practitionerId" AS "practitionerId", schedules.practitioner AS practitioner, schedules.patient AS patient, schedules.id AS id, schedules.reason AS reason, schedules.comments AS comments, schedules.state AS state, schedules.duration AS duration, schedules.time AS time, schedules.ts AS ts, schedules.responses AS responses, schedules."attentionType" AS "attentionType", schedules.history AS history, tb_user.specialty AS specialty
FROM schedules LEFT OUTER JOIN tb_user ON CAST(tb_user.id AS VARCHAR) = schedules."practitionerId"
WHERE (schedules."practitionerId" = %s OR (EXISTS (SELECT 1
FROM participants, tb_user
WHERE schedules.id = participants."scheduleId" AND tb_user.id = participants."participantId" AND tb_user.id = %s))) AND schedules."customerId" = %s AND schedules.ts BETWEEN %s AND %s ORDER BY schedules.ts DESC) AS anon_1]
[parameters: ('c42a1400-4534-11eb-8918-eb0d5241f5a7', 'c42a1400-4534-11eb-8918-eb0d5241f5a7', '8f69db20-4533-11eb-8918-eb0d5241f5a7', 1632970800000, 1635735599000)]
(Background on this error at: https://sqlalche.me/e/14/rvf5)

I've tried to get the session to run on each request in a variety of ways with no success. Any clues on what I'm missing?



Solution 1:[1]

Late to answer but it might help others

You'll have to create a session variable within your generator function, like below

async_session = sessionmaker(
         engine, 
         expire_on_commit=False, 
         class_=AsyncSession
         )
async def get_session() -> AsyncIterator[AsyncSession]:

    async with async_session() as session:
        yield session

This way there is a new session created for each call on get_session()

you can also refer the below article for a better understanding and setup

https://testdriven.io/blog/fastapi-sqlmodel/

Solution 2:[2]

When two requests come in parallel, and when the first request awaits at some DB operation, it hands over the control to the second request, and the second request wants to do so DB operation as well, it will find there is someone using the DB connection, the error occurs.

I solved this issue by using async_scoped_session

async_session = async_scoped_session(
    sessionmaker(
        engine,
        class_=AsyncSession,
    ),
    scopefunc=current_task,
)

see the document

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2