'How do I run SQL queries on the airflow meta db from inside an airflow DAG?
I'm looking for a way to extract execution_date, start_date, end_date etc. of the last successful run instance of a task in a DAG and then decide to raise an error if some branch hasn't been triggered in let's say a week.
Is there a way by which we can run SQL queries on the airflow meta db to look for the last successful run instance of a task and extract necessary information out of the entry? I have looked at the documentation but could not anything useful.
Solution 1:[1]
You could create a PythonOperator task that queries the task instance state from the metastore. Airflow internally queries the database via SQLAlchemy, a Python ORM framework. You can use this to query the database from your tasks too. For example:
import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State
with DAG(dag_id="so_72230617", schedule_interval="@daily", start_date=datetime.datetime(2022, 5, 1)) as dag:
first = EmptyOperator(task_id="first")
@provide_session
def _fetch_last_successful_ti(session=None):
ti_exists_in_last_week = session.query(
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == "so_72230617",
TaskInstance.task_id == "first",
TaskInstance.state == State.SUCCESS,
TaskInstance.execution_date >= timezone.utcnow() - datetime.timedelta(weeks=1),
)
.exists()
).scalar()
if not ti_exists_in_last_week:
raise Exception("No successful 'first' task instance found with execution_date in the last week.")
fetch_last_successful_ti = PythonOperator(
task_id="fetch_last_successful_ti", python_callable=_fetch_last_successful_ti
)
first >> fetch_last_successful_ti
The trick here is @provide_session
, which initializes a database session object, which you can use to query the database using Airflow objects and SQLAlchemy.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Bas Harenslak |