'Airflow 2.3 - Dynamic Task Mapping using Operators
I've got a current implementation of some code which works fine, but only carries out a single check per dag run as I cannot feed through multiple results to downstream tasks. The new Airflow 2.3.0 dynamic task mapping seems to allow a set of tasks/operators to run with a list or dictionary of outputs from a previous task - https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html.
I have tried multiple different ways to utilise this but no success, below is my current working implementation. Note that it fetches 1 result:
default_args = {
"owner": "test",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
}
with DAG(
"test",
default_args=default_args,
description="Test",
template_searchpath=["/airflow_home/dags/"],
start_date=datetime(2022, 5, 3),
catchup=False,
tags=["test"],
render_template_as_native_obj=True,
) as dag:
operator_a = CustomMySqlFetchDictionaryOperator(
task_id="operator_a",
mysql_conn_id="default_mysql",
database_schema="test",
sql="abc.sql",
fetch_size=1,
params={"param_1_value":"param_1_key",},
)
operator_b = CustomMySQLIntervalCheckOperator(
task_id="operator_b",
mysql_conn_id="default_mysql",
database_schema="test",
sql="abc.sql",
days_back=1,
date_filter_column="date",
ratio_formula="max_over_min",
metrics_thresholds={"AVG(total_size)": 1.05},
ignore_zero=False,
check_date="{{ ti.xcom_pull(task_ids='operator_a')[0]['date'] }}",
)
operator_a >> operator_b
An example of what the first operator returns is shown below
[{'remote_path': 'data.txt',
'date': datetime.date(2022, 5, 2),
'last_modified': datetime.datetime(2022, 5, 3, 2, 47),
'total_size': 3291050}]
The check_date parameter in the second operator uses value 0 from the list as it can only return 1 result (due to fetch_size=1). As the dag parameter render_template_as_native_obj is set to True the dictionary is rendered correctly for the second operator and they both succeed.
However, when adapting this to use the new dynamic task mapping as shown below, I cannot get the second operator to use the values when the fetch_size is increased to >1 and the list now contains multiple dictionaries which I want to be mapped to multiple tasks for the same operator. Example of my attempt:
operator_b = CustomMySQLIntervalCheckOperator.partial(
task_id="operator_b",
mysql_conn_id="default_mysql",
database_schema="test",
sql="abc.sql",
days_back=1,
date_filter_column="date",
ratio_formula="max_over_min",
metrics_thresholds={"AVG(total_size)": 1.05},
ignore_zero=False,
).expand(
check_date="{{ ti.xcom_pull(task_ids='operator_a')['date'] }}",
)
I have also tried to use XComArg via the operator_a.output notation (operator_a.output['date']) in the check_date parameter, however get an error when trying this "XComArg only supports str lookup, received int"
Appreciate any help or guidance I can get here!
Thanks.
Solution 1:[1]
expand()
receives a list
to operate the mapping on each element, but when you call expend(check_date = " ")
, you are passing a string
, right?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Mostafa Wael |