'Aiflow 2 Xcom in Task Groups
I have two tasks inside a TaskGroup that need to pull xcom values to supply the job_flow_id and step_id. Here's the code:
with TaskGroup('execute_my_steps') as execute_my_steps:
config = {some dictionary}
dependencies = {another dictionary}
task_id = 'execute_spark_job_step'
task_name = 'spark_job'
add_step = EmrAddStepsOperator(
task_id=task_id,
job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
steps=create_emr_step(args=config, d=dependencies),
aws_conn_id='aws_default',
retries=3,
dag=dag
)
wait_for_step = EmrStepSensor(
task_id='wait_for_' + task_name + '_step',
job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + task_id + "', key='return_value') }}",
retries=3,
dag=dag,
mode='reschedule'
)
add_step >> wait_for_step
The problem is the step_id does not render correctly. The wait_for_step
value in the UI rendered template shows as 'None'
, however, the xcom return_value for execute_spark_job_step
is there (this is the emr step_id).
wait_for_step rendered template:
When I remove the TaskGroup, it renders fine and the step waits until the job enters the completed state.
I need this to be in a task group because I will be looping through a larger config file and creating multiple steps.
Why doesn't this work? Do I need a nested TaskGroup? I tried using a TaskGroup without the context manager and still no luck.
Solution 1:[1]
TL;DR:
Your issue is happening because the id is not task_id
it's group_id.task_id
so your code should be:
task_ids=f"execute_my_steps.{ task_id }"
=>
step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids=f"execute_my_steps.{ task_id }", key='return_value') }}",
The explanation why it happens:
When task is assigned to TaskGroup the id of the task is no longer the task_id but it becomes group_id.task_id
to reflect this relationship.
In Airflow task_id
is unique but when you use TaskGroup you can set the same task_id
in different TaskGroups.
If this behavior is not something that you want, you can disable it by setting prefix_group_id=False
in your TaskGroup:
with TaskGroup(
group_id='execute_my_steps',
prefix_group_id=False
) as execute_my_steps:
By doing so your code will work without changes. The task_id
will simply be task_id
without the group_id
prefix. Note that this also means that it's up to you to make sure you don't have duplicated task_ids in your DAG.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Elad Kalif |