'SQLAlchemy: JOIN between different databases AND using different files in a module

Stack

I am using:

  • Python 3.10.x
  • FastAPI 0.75.x
  • SQLAlchemy 1.4.3x

Summary

I am building a unifying FastAPI project for several legacy databases (stored back-end on MariaDB 10.3 - structure has to be retained for some legacy software).

My SQLA setup uses a databases module to do the following:

/databases.py

import dotenv
import os

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import .models as models

dotenv.load_dotenv()

engines = {
    'parts': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300),
    'shop': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/shop", pool_pre_ping=True, pool_recycle=300),
    'purchasing': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/purchasing", pool_pre_ping=True, pool_recycle=300),
    "company": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/company", pool_pre_ping=True, pool_recycle=300),
    "auth": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/auth", pool_pre_ping=True, pool_recycle=300),
}

DBSession = sessionmaker(autocommit=False, autoflush=False, binds={
    # Catalogue
    models.Shop.Catalogue: engines["shop"],
    models.Shop.Sections: engines["shop"],
    models.Shop.Orders: engines["shop"],
    # ...
    # Parts
    models.Parts.Part: engines["parts"],
    models.Parts.BinLocations: engines["parts"],

    # ...
    #Purchasing
    models.Purchasing.SupplierOrder: engines["purchasing"],
    models.Purchasing.SupplierOrder: engines["purchasing"],
    # Company Data
    models.Company.Staffmember: engines["company"],
    models.Company.Suppliers: engines["company"],
    # API Auth
    models.Auth.User: engines["auth"],
    models.Auth.Privileges: engines["auth"],
})

# Dependency
def getDb():
    db = DBSession()
    try:
        yield db
    finally:
        db.close()

It's a little laborious having to do this for every model but it does work.

As I have several dbs I thought it would be logical to create a models module with sub-files for each db e.g. models.Parts, models.Shop, models.Purchase, models.Company, models.Auth etc.

/models/init.py


from importlib.metadata import metadata
from sqlalchemy.orm import declarative_base

base = declarative_base()

from . import Auth, Parts, Shop, Catalogue, Purchasing, Shop

I can create relationships successfully by importing the Base object in the __init__.py of models and importing that to each sub-file. For example:

/models/Auth.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, nullable=False, primary_key=True)
    username = Column(String(256), nullable=False)
    passhash = Column(String(512), nullable=False)
    email = Column(String, nullable=False)
    enabled = Column(Integer, nullable=True)
    staffmember_id = Column(Integer, nullable=False)

    staffmember = relationship("Company.Staffmember", uselist=False)

/models/Company.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class Staffmebmer(Base):
    __tablename__ = 'staffmembers'

    id = Column(Integer, ForeignKey("users.staffmember_id"), nullable=False, primary_key=True)
    order = Column(Integer, default=0, nullable=False)
    name = Column(String, nullable=True)
    initial = Column(String, nullable=True)
    email = Column(String, nullable=False)
    enabled = Column(Integer, default=0, nullable=False)

    relationship("Auth.User", back_populates="staffmember")

The following route works just fine:

demo.py


from fastapi import Depends

from sqlalchemy.orm import Session

from .. import app, databases, models

@app.get("/api/user/{id}")
async def read_items(id: int, db: Session=Depends(databases.getDb)):
    user = db.query(models.Auth.User).filter(
        models.Auth.User.id == id
    ).first()

    user.staffmember

    return user

Accessing this URL returns: (Yes, I'm aware this isn't secure, it is for illustrative purposes only to show that the relationship functions!)

{
  "username": "mark",
  "passhash": "<my hash>",
  "enabled": 1,
  "email": "[email protected]",
  "id": 1,
  "staffmember_id": 5,
  "staffmember": {
    "order": 20,
    "name": "Mark",
    "email": "[email protected]",
    "kStaffmember": 5,
    "initial": "MB",
    "enabled": 1
  }
}

However, I want to use steffmember initials as a possible username, so when I qyuery for a user in my OAUTH Authorize scripts I tried to use:


from ..models import Auth, Company

# 'username' is provided by the auth script from the standard username/password OAuth fields

def get_user(db: Session, username: str):
    db_user_data = db.query(Auth.User).join(Company.Staffmember).filter(
        or_(
            Auth.User.username == username,
            Auth.User.email == username,
            Company.Staffmember.initial == username
        )
    ).first()

and I get an Exception:

(pymysql.err.ProgrammingError) (1146, "Table 'auth.staffmembers' doesn't exist")

Am I going about this whole thing the right way and is there a possible way around this issue?



Solution 1:[1]

If anyone stumbles over this and needs a rational answer I couched it to the SQLAlchemy Git discussions page and got a pretty sane answer back that helped me resolve it.

https://github.com/sqlalchemy/sqlalchemy/discussions/8027

In summary:

  • You don't need multiple engine connections to the same MySQL/mariadb server for different databases. You just need to start a session on one of the databases - remember it's the name of the database not the model or module in the Python SQA code.

My new databases.py:

import dotenv
import os

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import .models as models

dotenv.load_dotenv()

DBengine = create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300)

DBSession = sessionmaker(bind=DBengine autocommit=False, autoflush=False)

# Dependency
def getDb():
    db = DBSession()
    try:
        yield db
    finally:
        db.close()
  • If you use the above style, you need to be more explicit with your ForeignKey() statements e.g. ForeignKey("<db>.<table>.<field>") so you are explicitly telling SQA which database and table to look in for every one.

  • You will need to sate the name of the database name to each model as the schema e.g. add __table_args__ = { "schema": "<database name>" } - remember it's the name of the database not the model or module in the Python SQA code.

New /modules/Auth.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class User(Base):
    __tablename__ = 'users' #table is called 'users'
    __table_args__ = { "schema": "auth" } #database is called 'auth'

    id = Column(Integer, nullable=False, primary_key=True)
    username = Column(String(256), nullable=False)
    passhash = Column(String(512), nullable=False)
    email = Column(String, nullable=False)
    enabled = Column(Integer, nullable=True)
    staffmember_id = Column(Integer, nullable=False)

    staffmember = relationship("Company.Staffmember", uselist=False)

New /models/Company.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class Staffmember(Base):
    __tablename__ = 'staffmembers' #table is called 'staffmembers'
    __table_args__ = { "schema": "company" } #database is called 'company'

    id = Column(Integer, ForeignKey("auth.users.staffmember_id"), nullable=False, primary_key=True)
    # ForeignKey now needs to know the database AND table name for the field it refers to
    order = Column(Integer, default=0, nullable=False)
    name = Column(String, nullable=True)
    initial = Column(String, nullable=True)
    email = Column(String, nullable=False)
    enabled = Column(Integer, default=0, nullable=False)

    relationship("Auth.User", back_populates="staffmember")

Once you use this process, SQA knows to prefix the correct database name in joins and relationships and things should work fine.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1