'FastAPI @repeat_every throws 'Depends' object has no attribute 'query'
I am new to FastAPI. I want to use repeat_every() to generate bills from some sensor reading periodically. I wrote the following code but I am getting 'Depends' object has no attribute 'query' if the function is called in the repeat_every.
here is my sample code
main.py
get_db = database.get_db
@app.on_event("startup")
@repeat_every(seconds= 60 * 3, wait_first= True)
def billCreation(db: Session = Depends(database.get_db)):
print("Billing generation is running every 3 mins")
billing.generateBills(db)
billing.py
from sqlalchemy.orm import Session
from app.models import model
from datetime import datetime, timedelta
from sqlalchemy import and_
def generateBills(db: Session ):
fifteen_days_ago = datetime.today() - timedelta(days = 15)
sensors = db.query( model.Sensor).all()
volumeUsed = 0
rate= db.query(model.Rate).order_by(model.Rate.id.desc()).first()
for sensor in sensors:
reading = db.query(model.SensorReadings).filter(model.SensorReadings.id == sensor.id and model.SensorReadings.dateAdded <= fifteen_days_ago).all()
volumeUsed += reading.volume
totalCost = volumeUsed * rate.ratePerLitre
new_cost = model.Cost(sensorId = sensor.id,
volumeUsed = volumeUsed,
ratePerLitre = rate.ratePerLitre,
totalCost = totalCost)
db.add(new_cost)
db.commit()
db.refresh(new_cost)
model.py
class Sensor(Base):
__tablename__ = 'sensors'
id = Column(Integer, primary_key =True, index = True)
model = Column(String)
dateAdded = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class Rate(Base):
__tablename__ = 'rates'
id = Column(Integer, primary_key = True, index =True)
ratePerLitre = Column(Float)
dateAdded = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class SensorReadings(Base):
__tablename__ = 'sensorReadings'
id = Column(Integer, primary_key =True, index = True)
sensorId = Column(Integer, ForeignKey('sensors.id'))
waterFlowRate = Column(Float, nullable=True)
volume = Column(Float, nullable=True)
dateAdded = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class Cost(Base):
__tablename__ = 'costs'
id = Column(Integer, primary_key = True, index =True)
sensorId = Column(Integer, ForeignKey('sensors.id'))
volumeUsed = Column(Float)
ratePerLitre = Column(Float)
totalCost = Column(Float)
dateCreated = Column(DateTime, default=datetime.now)
Please what is wrong with my code. Any alternative way to do this with FastAPI
Solution 1:[1]
Depends
is a FastAPI's feature, and it refers to a callable object whenever the app
is called by the server, thats why its called dependency.
In your case, @repeat_every
seems not belongs to FastAPI's feature.
if you really want to start it every time the app started maybe you can try this by assuming your @repeat_every
is a function wrapper, then it should be called.
get_db = database.get_db
@app.on_event("startup")
def startup(db: Session = Depends(get_db)):
@repeat_every(seconds= 60 * 3, wait_first= True)
def billCreation():
print("Billing generation is running every 3 mins")
billing.generateBills(db)
billCreation() # you can comment this if its not a wrapper
Solution 2:[2]
In order to make this very dynamic, I have to switch to python celery. Using repeat_every will work alright but assuming an upgrade is done to your server, you need to be able to have control over the job running period of time. With celery, you can control the time your job runs. So following the folder module structure from celery docs, I have the following module:
app/celeryJobs/
__init__py
tasks.py
celeryWorker.py
billing.py
and having the following lines of code.
tasks.py
from app.celeryJobs.celeryWorker import celery, celery_log
from app.emails import billingEmail
import time
import json
from json import JSONEncoder
from sqlalchemy.orm import Session
from app.emails import billingEmail
from app.celeryJobs import billing
from app.utils import database
@celery.task(name ='app.celeryJobs.tasks.bill_creation')
def bill_creation():
billing.generateBills()
return {"message": "Bill Generation runs"}
@celery.task(name ='app.celeryJobs.tasks.sum_number')
def sum_number(x, y):
add = x + y
celery_log.info("I am suppose to run")
return add
@celery.task(name ='app.celeryJobs.tasks.sendBillingEmail')
def sendBillingEmail(email:str, fullName: str, month: str, amount:str):
time.sleep(5)
celery_log.info("I am suppose to run")
respon = billingEmail.sendBilling( email, fullName, month, amount)
json_object = json.dumps(dict(respon), indent = 4)
return json_object
celeryWorker.py
from celery import Celery
from celery.utils.log import get_task_logger
from celery.schedules import crontab
celery = Celery('app.celeryJobs',
broker="pyamqp://guest@localhost//",
include=['app.celeryJobs.tasks']
)
celery.conf.update(
task_serializer='json',
result_serializer='json',
timezone='UTC',
enable_utc=True,
imprts=("app.celeryJobs")
)
celery_log = get_task_logger(__name__)
celery.conf.beat_schedule = {
'sum_number': {
'task': 'app.celeryJobs.tasks.sum_number',
'schedule': 30.0,
'args': (16, 16)
},
'bill_creation': {
'task': 'app.celeryJobs.tasks.bill_creation',
'schedule': crontab(minute=0,hour='13,14,15,16,17,18,19,20, 21, 22, 23, 0'),
},
}
billing.py
from sqlalchemy.orm import Session
from app.models import model, schemas
from datetime import datetime, timedelta
from sqlalchemy import and_
from app.emails import billingEmail
import calendar
from app.celeryJobs import tasks
from sqlalchemy import create_engine
from sqlalchemy import Column, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.utils import config
import logging
def generateBills():
#Database Connection
db_string = config.settings.SQLALCHEMY_DATABASE_URI
con = create_engine(db_string)
Session = sessionmaker(con)
db = Session()
fifteen_days_ago = datetime.today() - timedelta(days = 15)
sensors = db.query( model.Sensor).all()
print('days',fifteen_days_ago)
print('leng sensors', len(sensors))
volumeUsed = 0
rate= db.query(model.Rate).order_by(model.Rate.id.desc()).first()
print('rate', rate.ratePerLitre)
for sensor in sensors:
senData = sensor
readings = db.query(model.SensorReadings).filter(model.SensorReadings.sensorId == senData.id and model.SensorReadings.dateAdded <= fifteen_days_ago).all()
user = db.query(model.User).filter(model.User.sensorId == senData.id).first()
print('leng readings',len(readings))
volumeUsed = sum(reading.volume for reading in readings)
print('volume',volumeUsed)
totalCost = volumeUsed * rate.ratePerLitre
print('id', senData.id)
print('totalcost', totalCost)
new_cost = model.Cost(sensorId = senData.id,
volumeUsed = volumeUsed,
ratePerLitre = rate.ratePerLitre,
totalCost = totalCost)
print(new_cost)
# db.add(new_cost)
# db.commit()
# db.refresh(new_cost)
if user is not None:
datem = datetime.strptime(str(senData.dateAdded), "%Y-%m-%d %H:%M:%S.%f")
month = calendar.month_name[datem.month]
print('month', month)
print('username',user.email)
# respon = billingEmail.sendBilling( user.email, user.fullName, month, str(totalCost))
# print(respon)
task = tasks.sendBillingEmail.delay( user.email, user.fullName, month, str(totalCost))
db.close()
logging.info("Cost bill generate for sensor")
Make sure your python virtual environment is activated, run the following commands on different terminals to run the celery periodic task.
celery -A app.celeryJobs.celeryWorker beat --loglevel=info
celery -A app.celeryJobs.celeryWorker worker -l info -P eventlet
Ensure you read the celery docs for details on setting up Broker and Backend. Enjoy!
Alternatively, to use repeat_every and avoid the Depends' object has no attribute 'query' error.
Make sure you connect to the database directly as show in the lines of code below:
billing.py
from sqlalchemy.orm import Session
from app.models import model, schemas
from datetime import datetime, timedelta
from sqlalchemy import and_
from app.emails import billingEmail
import calendar
from app.celeryJobs import tasks
from sqlalchemy import create_engine
from sqlalchemy import Column, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.utils import config
import logging
def generateBills():
db_string = config.settings.SQLALCHEMY_DATABASE_URI
con = create_engine(db_string)
Session = sessionmaker(con)
db = Session()
fifteen_days_ago = datetime.today() - timedelta(days = 15)
sensors = db.query( model.Sensor).all()
print('days',fifteen_days_ago)
print('leng sensors', len(sensors))
volumeUsed = 0
rate= db.query(model.Rate).order_by(model.Rate.id.desc()).first()
print('rate', rate.ratePerLitre)
for sensor in sensors:
senData = sensor
readings = db.query(model.SensorReadings).filter(model.SensorReadings.sensorId == senData.id and model.SensorReadings.dateAdded <= fifteen_days_ago).all()
user = db.query(model.User).filter(model.User.sensorId == senData.id).first()
print('leng readings',len(readings))
volumeUsed = sum(reading.volume for reading in readings)
print('volume',volumeUsed)
totalCost = volumeUsed * rate.ratePerLitre
print('id', senData.id)
print('totalcost', totalCost)
new_cost = model.Cost(sensorId = senData.id,
volumeUsed = volumeUsed,
ratePerLitre = rate.ratePerLitre,
totalCost = totalCost)
print(new_cost)
# db.add(new_cost)
# db.commit()
# db.refresh(new_cost)
if user is not None:
datem = datetime.strptime(str(senData.dateAdded), "%Y-%m-%d %H:%M:%S.%f")
month = calendar.month_name[datem.month]
print('month', month)
print('username',user.email)
# respon = billingEmail.sendBilling( user.email, user.fullName, month, str(totalCost))
# print(respon)
task = tasks.sendBillingEmail.delay( user.email, user.fullName, month, str(totalCost))
db.close()
logging.info("Cost bill generate for sensor")
Then in your main.py
use the repeat_every to call the function as shown below:
@app.on_event("startup")
@repeat_every(seconds= 60*60*7, wait_first= True)
def billCreation():
print("Billing generation is running every 7 days")
billing.generateBills()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | danangjoyoo |
Solution 2 | LARTEY JOSHUA |