'Is there a way to pause an airflow DagRun?

Is there a way to pause a specific DagRun within Airflow?

I want to be able to have multiple, simultaneous executing runs of a single DAG, and I want to be able to pause those runs individually at certain points.

The unpause/pause function seems to work only at the DAG level and pauses/unpasses all DagRuns (for that DAG) from executing.

I want to be able to do this because I'd like to have some long-running async tasks and I don't want to take up a worker that's running an infinite sensor, so I'd like to create a task that pauses the dag and some other operation ( such as an API call ) will unpause the dag run.



Solution 1:[1]

Paused is not a DagRun state (https://github.com/apache/airflow/blob/c9023fad4287213e4d3d77f4c66799c762bff7ba/airflow/utils/state.py#L65), so there is no way to specifically pause a DagRun. You can pause a whole DAG, and I use that for handling long-running DAGs with some success. Another option is to leverage priorities, and ensure that multiple instances of your DAG can run at the same time. Here is some toy code that demonstrates the concept:

@dag(schedule_interval='@once', start_date=utils.dates.days_ago(1), max_active_runs=1)
def MyDAG(is_slow=False):
    @task_group()
    def build_tasks():
        if is_slow:
            priority_weight=1
        else:
            priority_weight=100
        @task(priority_weight=priority_weight)
        def my_task():
            # your logic
            return
        return my_task()

     ti = build_tasks()
     return ti
dag = MyDAG()

is_slow can be passed as a parameter in the DagRun job conf. Provided you can tell that the specific run will take a long time, this approach would ensure that subsequent, fast instances will get priority and run ahead of the first slow run. Importantly, this approach will only work if you have many available workers and the slow runs trigger a number of task instances that is much less than the number of workers.

Solution 2:[2]

To solve this issue I use a python function like below

def pause_for_provisioning(**kwards):
    """
    creates a 3 minute window to allow for the start up scripts to complete their work
    """
    time.sleep(240)
    return 0

and set up a DAG task to utilize it

provision_pause = PythonOperator(
    task_id='pause_for_provisioning',
    python_callable=pause_for_provisioning,
    provide_context=True
)

and then plumb that into the pipeline where the pause is required. I use this specifically to pause the DAG after creating a VM with a VM startup script, including git clones, code libraries, and anything else that needs to be on the machine for the cloned repo to function properly. This could easily be used for any reason a pause is required.

Solution 3:[3]

If this is about sensors, you are in luck because the solution was implemented in version 1.10.2: https://issues.apache.org/jira/browse/AIRFLOW-2747.

:param mode: How the sensor operates.
    Options are: ``{ poke | reschedule }``, default is ``poke``.
    When set to ``poke`` the sensor is taking up a worker slot for its
    whole execution time and sleeps between pokes. Use this mode if the
    expected runtime of the sensor is short or if a short poke interval
    is requried.
    When set to ``reschedule`` the sensor task frees the worker slot when
    the criteria is not yet met and it's rescheduled at a later time. Use
    this mode if the expected time until the criteria is met is. The poke
    interval should be more than one minute to prevent too much load on
    the scheduler.

Source: https://github.com/apache/airflow/blob/e62ad5333cbb56ae0f2001f0f79008a21c41a983/airflow/sensors/base_sensor_operator.py#L46

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Richard Rymer
Solution 2 Michael Janzen
Solution 3 bosnjak