'Airflow - What do I do when I have a variable amount of Work that needs to be handled by a DAG?

I have a sensor task that listens to files being created in S3.

After a poke I may have 3 files, after another poke I might have another 5 files.

I want to create a DAG (or multiple dags) that listen to work request, and creates others tasks or DAGs to handle that amount of work.

I wish I could access the xcom or dag_run variable from the DAG definition (see pseudo-code as follows):


def wait_for_s3_data(ti, **kwargs):
    s3_wrapper = S3Wrapper()
    work_load = s3_wrapper.work()
    # work_load: {"filename1.json": "s3/key/filename1.json", ....}
    ti.xcom_push(key="work_load", value=work_load)
    return len(work_load) > 0

def get_work(self, dag_run, ti, **_):
    s3_wrapper = S3Wrapper()
    work_load = ti.xcom_pull(key="work_load")
    dag_run.conf['work_load'] = work_load
    s3_wrapper.move_messages_from_waiting_to_processing(work_load)

with DAG(
    "ListenAndCallWorkers",
    description="This DAG waits for work request from s3",
    schedule_interval="@once",
    max_active_runs=1,
) as dag:

    wait_for_s3_data: PythonSensor = PythonSensor(
        task_id="wait_for_s3_data",
        python_callable=wait_for_s3_data,
        timeout=60,
        poke_interval=30,
        retries=2,
        mode="reschedule",
    )

    get_data_task = PythonOperator(
        task_id="GetData",
        python_callable=query.get_work,
        provide_context=True,
    )

    work_load  = "{{ dag_run.conf['work_load'] }}" # <--- I WISH I COULD DO THIS
    
    do_work_tasks  = [
        TriggerDagRunOperator(
            task_id=f"TriggerDoWork_{work}",
            trigger_dag_id="Work",  # Ensure this equals the dag_id of the DAG to trigger
            conf={"work":keypath},
        )
        for work, keypath in work_load.items():
    ]

    wait_for_s3_data >> get_data_task >> do_work_tasks

I know I cannot do that.

I also tried to defined my own custom MultiTriggerDAG object (as in this https://stackoverflow.com/a/51790697/1494511). But at that step I still don't have access to the amount of work that needs to be done.

Another idea:

I am considering build a DAG with N doWork tasks, and I pass work to up to N via xcom


def get_work(self, dag_run, ti, **_):
    s3_wrapper = S3Wrapper()
    work_load = ti.xcom_pull(key="work_load")
    i = 1
    for work, keypath in work_load.items()
        dag_run.conf[f'work_{i}'] = keypath
        i += 1
        if i > N:
            break
    s3_wrapper.move_messages_from_waiting_to_processing(work_load[:N])

This idea would get the job done, but it sounds very inefficient

Related questions:

This is the same question as I have, but no code is presented on how to solve it:

Airflow: Proper way to run DAG for each file

This answer looks like it would solve the problem, but it seems to be related to Airflow versions lower than 2.2.2 How do we trigger multiple airflow dags using TriggerDagRunOperator?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source