'FastAPI @repeat_every throws 'Depends' object has no attribute 'query'

I am new to FastAPI. I want to use repeat_every() to generate bills from some sensor reading periodically. I wrote the following code but I am getting 'Depends' object has no attribute 'query' if the function is called in the repeat_every.

here is my sample code main.py

get_db = database.get_db

@app.on_event("startup")
@repeat_every(seconds= 60 * 3, wait_first= True) 
def billCreation(db: Session = Depends(database.get_db)):
    print("Billing generation is running every 3 mins")
    billing.generateBills(db)

billing.py

from sqlalchemy.orm import Session
from app.models import model 
from datetime import datetime, timedelta
from sqlalchemy import and_

def generateBills(db: Session ):
     fifteen_days_ago = datetime.today() - timedelta(days = 15)
     sensors = db.query( model.Sensor).all()
     volumeUsed = 0
     rate= db.query(model.Rate).order_by(model.Rate.id.desc()).first()
     for sensor in sensors:
            reading = db.query(model.SensorReadings).filter(model.SensorReadings.id == sensor.id and model.SensorReadings.dateAdded <= fifteen_days_ago).all()
            volumeUsed  += reading.volume
            totalCost = volumeUsed * rate.ratePerLitre

            new_cost =  model.Cost(sensorId = sensor.id,
                                   volumeUsed = volumeUsed,
                                   ratePerLitre = rate.ratePerLitre,
                                   totalCost = totalCost)
            db.add(new_cost)
            db.commit()
            db.refresh(new_cost)

model.py

class Sensor(Base):
    __tablename__ = 'sensors'
    id = Column(Integer, primary_key =True, index = True)
    model = Column(String)
    dateAdded = Column(DateTime, default=datetime.now, onupdate=datetime.now)

class Rate(Base):
    __tablename__ = 'rates'
    id = Column(Integer, primary_key = True, index =True)
    ratePerLitre = Column(Float)
    dateAdded = Column(DateTime, default=datetime.now, onupdate=datetime.now)

class SensorReadings(Base):
    __tablename__ = 'sensorReadings'
    id = Column(Integer, primary_key =True, index = True)
    sensorId =  Column(Integer, ForeignKey('sensors.id'))
    waterFlowRate = Column(Float,  nullable=True)
    volume = Column(Float,  nullable=True)
    dateAdded = Column(DateTime, default=datetime.now, onupdate=datetime.now) 

class Cost(Base):
    __tablename__ = 'costs'
    id = Column(Integer, primary_key = True, index =True)
    sensorId = Column(Integer, ForeignKey('sensors.id'))
    volumeUsed = Column(Float)
    ratePerLitre = Column(Float)
    totalCost = Column(Float)
    dateCreated = Column(DateTime, default=datetime.now)

Please what is wrong with my code. Any alternative way to do this with FastAPI



Solution 1:[1]

Depends is a FastAPI's feature, and it refers to a callable object whenever the app is called by the server, thats why its called dependency.

In your case, @repeat_every seems not belongs to FastAPI's feature.

if you really want to start it every time the app started maybe you can try this by assuming your @repeat_every is a function wrapper, then it should be called.

get_db = database.get_db

@app.on_event("startup")
def startup(db: Session = Depends(get_db)):
    @repeat_every(seconds= 60 * 3, wait_first= True)
    def billCreation():
        print("Billing generation is running every 3 mins")
        billing.generateBills(db)
    billCreation() # you can comment this if its not a wrapper

Solution 2:[2]

In order to make this very dynamic, I have to switch to python celery. Using repeat_every will work alright but assuming an upgrade is done to your server, you need to be able to have control over the job running period of time. With celery, you can control the time your job runs. So following the folder module structure from celery docs, I have the following module:

app/celeryJobs/
   __init__py
   tasks.py
   celeryWorker.py
   billing.py

and having the following lines of code.

tasks.py
from app.celeryJobs.celeryWorker import celery, celery_log
from app.emails import billingEmail
import time
import json
from json import JSONEncoder
from sqlalchemy.orm import Session
from app.emails import billingEmail
from app.celeryJobs import billing
from app.utils import database

@celery.task(name ='app.celeryJobs.tasks.bill_creation')
def bill_creation():
    billing.generateBills()
    return {"message": "Bill Generation runs"}
    
@celery.task(name ='app.celeryJobs.tasks.sum_number')
def sum_number(x, y):
    add = x + y
    celery_log.info("I am suppose to run")
    return add

@celery.task(name ='app.celeryJobs.tasks.sendBillingEmail')
def sendBillingEmail(email:str, fullName: str, month: str, amount:str):
    time.sleep(5)
    celery_log.info("I am suppose to run")
    respon = billingEmail.sendBilling( email, fullName, month, amount)
    json_object = json.dumps(dict(respon), indent = 4) 
    return json_object
celeryWorker.py

from celery import Celery
from celery.utils.log import get_task_logger
from celery.schedules import crontab


celery = Celery('app.celeryJobs',
    broker="pyamqp://guest@localhost//",
      include=['app.celeryJobs.tasks']
    )

celery.conf.update(
    task_serializer='json',
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    imprts=("app.celeryJobs")
)

celery_log = get_task_logger(__name__)


celery.conf.beat_schedule = {
    'sum_number': {
        'task': 'app.celeryJobs.tasks.sum_number',
        'schedule': 30.0,
        'args': (16, 16)
    },
      'bill_creation': {
        'task': 'app.celeryJobs.tasks.bill_creation',
        'schedule': crontab(minute=0,hour='13,14,15,16,17,18,19,20, 21, 22, 23, 0'),
    },
}
billing.py
from sqlalchemy.orm import Session
from app.models import model, schemas
from datetime import datetime, timedelta
from sqlalchemy import and_
from app.emails import billingEmail
import calendar
from app.celeryJobs import tasks

from sqlalchemy import create_engine  
from sqlalchemy import Column, String  
from sqlalchemy.ext.declarative import declarative_base  
from sqlalchemy.orm import sessionmaker
from app.utils import config


import logging

def generateBills():
#Database Connection
     db_string = config.settings.SQLALCHEMY_DATABASE_URI
     con = create_engine(db_string)  
     Session = sessionmaker(con)  
     db = Session()
     
     
     fifteen_days_ago = datetime.today() - timedelta(days = 15)
     sensors = db.query( model.Sensor).all()
     print('days',fifteen_days_ago)
     print('leng sensors', len(sensors))
     volumeUsed = 0
     rate= db.query(model.Rate).order_by(model.Rate.id.desc()).first()
     print('rate',  rate.ratePerLitre)
     for sensor in sensors:
          senData = sensor
          readings = db.query(model.SensorReadings).filter(model.SensorReadings.sensorId == senData.id and model.SensorReadings.dateAdded <= fifteen_days_ago).all()
          user = db.query(model.User).filter(model.User.sensorId == senData.id).first()
          print('leng readings',len(readings))
          volumeUsed = sum(reading.volume for reading in readings)
          print('volume',volumeUsed)
          totalCost = volumeUsed * rate.ratePerLitre
          print('id', senData.id)
          print('totalcost', totalCost)
          new_cost =  model.Cost(sensorId = senData.id,
                                   volumeUsed = volumeUsed,
                                   ratePerLitre = rate.ratePerLitre,
                                   totalCost = totalCost)
          print(new_cost)
          #   db.add(new_cost)
          #   db.commit()
          #   db.refresh(new_cost)
          if user is not None:
               datem = datetime.strptime(str(senData.dateAdded), "%Y-%m-%d %H:%M:%S.%f")
               month =  calendar.month_name[datem.month]
               print('month', month)
               print('username',user.email)
               # respon = billingEmail.sendBilling( user.email,  user.fullName, month, str(totalCost))
               # print(respon)
               task = tasks.sendBillingEmail.delay( user.email, user.fullName, month, str(totalCost))
          db.close()   
          logging.info("Cost bill generate for sensor")

Make sure your python virtual environment is activated, run the following commands on different terminals to run the celery periodic task.

celery -A app.celeryJobs.celeryWorker beat --loglevel=info
celery -A app.celeryJobs.celeryWorker worker -l info -P eventlet

Ensure you read the celery docs for details on setting up Broker and Backend. Enjoy!

Alternatively, to use repeat_every and avoid the Depends' object has no attribute 'query' error.

Make sure you connect to the database directly as show in the lines of code below: billing.py

from sqlalchemy.orm import Session
from app.models import model, schemas
from datetime import datetime, timedelta
from sqlalchemy import and_
from app.emails import billingEmail
import calendar
from app.celeryJobs import tasks

from sqlalchemy import create_engine  
from sqlalchemy import Column, String  
from sqlalchemy.ext.declarative import declarative_base  
from sqlalchemy.orm import sessionmaker
from app.utils import config


import logging

def generateBills():
     db_string = config.settings.SQLALCHEMY_DATABASE_URI
     con = create_engine(db_string)  
     Session = sessionmaker(con)  
     db = Session()
     
     
     fifteen_days_ago = datetime.today() - timedelta(days = 15)
     sensors = db.query( model.Sensor).all()
     print('days',fifteen_days_ago)
     print('leng sensors', len(sensors))
     volumeUsed = 0
     rate= db.query(model.Rate).order_by(model.Rate.id.desc()).first()
     print('rate',  rate.ratePerLitre)
     for sensor in sensors:
          senData = sensor
          readings = db.query(model.SensorReadings).filter(model.SensorReadings.sensorId == senData.id and model.SensorReadings.dateAdded <= fifteen_days_ago).all()
          user = db.query(model.User).filter(model.User.sensorId == senData.id).first()
          print('leng readings',len(readings))
          volumeUsed = sum(reading.volume for reading in readings)
          print('volume',volumeUsed)
          totalCost = volumeUsed * rate.ratePerLitre
          print('id', senData.id)
          print('totalcost', totalCost)
          new_cost =  model.Cost(sensorId = senData.id,
                                   volumeUsed = volumeUsed,
                                   ratePerLitre = rate.ratePerLitre,
                                   totalCost = totalCost)
          print(new_cost)
          #   db.add(new_cost)
          #   db.commit()
          #   db.refresh(new_cost)
          if user is not None:
               datem = datetime.strptime(str(senData.dateAdded), "%Y-%m-%d %H:%M:%S.%f")
               month =  calendar.month_name[datem.month]
               print('month', month)
               print('username',user.email)
               # respon = billingEmail.sendBilling( user.email,  user.fullName, month, str(totalCost))
               # print(respon)
               task = tasks.sendBillingEmail.delay( user.email, user.fullName, month, str(totalCost))
          db.close()   
          logging.info("Cost bill generate for sensor")

Then in your main.py use the repeat_every to call the function as shown below:

@app.on_event("startup")
@repeat_every(seconds= 60*60*7, wait_first= True)
def billCreation():
    print("Billing generation is running every 7 days")
    billing.generateBills()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 danangjoyoo
Solution 2 LARTEY JOSHUA