'Aiflow 2 Xcom in Task Groups

I have two tasks inside a TaskGroup that need to pull xcom values to supply the job_flow_id and step_id. Here's the code:

  with TaskGroup('execute_my_steps') as execute_my_steps: 

    config = {some dictionary}
    dependencies = {another dictionary}

    task_id = 'execute_spark_job_step'
    task_name = 'spark_job'

    add_step = EmrAddStepsOperator(
        task_id=task_id,
        job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
        steps=create_emr_step(args=config, d=dependencies),
        aws_conn_id='aws_default',
        retries=3,
        dag=dag
    )

    wait_for_step = EmrStepSensor(
        task_id='wait_for_' + task_name + '_step',
        job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + task_id + "', key='return_value') }}",
        retries=3,
        dag=dag,
        mode='reschedule'
    )

    add_step >> wait_for_step

The problem is the step_id does not render correctly. The wait_for_step value in the UI rendered template shows as 'None', however, the xcom return_value for execute_spark_job_step is there (this is the emr step_id).

wait_for_step rendered template: wait_for_step rendered template

execute_spark_job_step xcom: execute_spark_job_step xcom

When I remove the TaskGroup, it renders fine and the step waits until the job enters the completed state.

I need this to be in a task group because I will be looping through a larger config file and creating multiple steps.

Why doesn't this work? Do I need a nested TaskGroup? I tried using a TaskGroup without the context manager and still no luck.



Solution 1:[1]

TL;DR:

Your issue is happening because the id is not task_id it's group_id.task_id so your code should be:

task_ids=f"execute_my_steps.{ task_id }"

=>

    step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids=f"execute_my_steps.{ task_id }", key='return_value') }}",

The explanation why it happens:

When task is assigned to TaskGroup the id of the task is no longer the task_id but it becomes group_id.task_id to reflect this relationship. In Airflow task_id is unique but when you use TaskGroup you can set the same task_id in different TaskGroups. If this behavior is not something that you want, you can disable it by setting prefix_group_id=False in your TaskGroup:

with TaskGroup(
    group_id='execute_my_steps',
    prefix_group_id=False
) as execute_my_steps:

By doing so your code will work without changes. The task_id will simply be task_id without the group_id prefix. Note that this also means that it's up to you to make sure you don't have duplicated task_ids in your DAG.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Elad Kalif