'SqlAlchemy asyncio orm: How to query the database
In SqlAlchemy async orm engine how do I query a table and get a value or all?
I know from the non async methods that I can just do
SESSION.query(TableClass).get(x)
but trying this with the async methods it throws the next error:
AttributeError: 'AsyncSession' object has no attribute 'query'.
Here's my SESSION variable defined. The LOOP variable is just asyncio.get_event_loop()
used to start async methods when my sql module is loaded and populate variables used as a cache to avoid caching the database every time I need somethin:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from .. import CONFIG, LOOP
def _build_async_db_uri(uri):
if "+asyncpg" not in uri:
return '+asyncpg:'.join(uri.split(":", 1))
return uri
async def start() -> declarative_base:
engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async with engine.begin() as conn:
BASE.metadata.bind = engine
await conn.run_sync(BASE.metadata.create_all)
return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))
BASE = declarative_base()
SESSION = LOOP.run_until_complete(start())
Here's an example of table and cache function:
class TableClass:
__tablename__ = "tableclass"
id = Column(Integer, primary_key = True)
alias = Column(Integer)
CACHE = {}
async def _load_all():
global CACHE
try:
curr = await SESSION.query(TableClass).all()
CACHE = {i.id: i.alias for i in curr}
LOOP.run_until_complete(_load_all())
Solution 1:[1]
session.query is the old API. The asynchronous version uses select and accompanying methods.
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async_session = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
CACHE = {}
async def _load_all():
global CACHE
try:
async with async_session() as session:
q = select(TableClass)
result = await session.execute(q)
curr = result.scalars()
CACHE = {i.id: i.alias for i in curr}
except:
pass
LOOP.run_until_complete(_load_all())
You can read more about SqlAlchemy Asynchronous I/O here
Solution 2:[2]
New in version 1.4: Added Session.get(), which is moved from the now deprecated Query.get() method.
id = 1
user = await session.get(User, id)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | SimfikDuke |
Solution 2 | Ramon Dias |