'How do I run SQL queries on the airflow meta db from inside an airflow DAG?

I'm looking for a way to extract execution_date, start_date, end_date etc. of the last successful run instance of a task in a DAG and then decide to raise an error if some branch hasn't been triggered in let's say a week.

Is there a way by which we can run SQL queries on the airflow meta db to look for the last successful run instance of a task and extract necessary information out of the entry? I have looked at the documentation but could not anything useful.



Solution 1:[1]

You could create a PythonOperator task that queries the task instance state from the metastore. Airflow internally queries the database via SQLAlchemy, a Python ORM framework. You can use this to query the database from your tasks too. For example:

import datetime

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State

with DAG(dag_id="so_72230617", schedule_interval="@daily", start_date=datetime.datetime(2022, 5, 1)) as dag:

    first = EmptyOperator(task_id="first")

    @provide_session
    def _fetch_last_successful_ti(session=None):
        ti_exists_in_last_week = session.query(
            session.query(TaskInstance)
            .filter(
                TaskInstance.dag_id == "so_72230617",
                TaskInstance.task_id == "first",
                TaskInstance.state == State.SUCCESS,
                TaskInstance.execution_date >= timezone.utcnow() - datetime.timedelta(weeks=1),
            )
            .exists()
        ).scalar()

        if not ti_exists_in_last_week:
            raise Exception("No successful 'first' task instance found with execution_date in the last week.")

    fetch_last_successful_ti = PythonOperator(
        task_id="fetch_last_successful_ti", python_callable=_fetch_last_successful_ti
    )

    first >> fetch_last_successful_ti

The trick here is @provide_session, which initializes a database session object, which you can use to query the database using Airflow objects and SQLAlchemy.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Bas Harenslak