'How to render a .sql file with parameters in MySqlOperator in Airflow?
I need help in passing parameters (xcom pushed from previous task), to a SQL query in a .sql file. However, I am unable to do so using the "parameters" option, even though this option is able to render xcom values from previous task. Let me know what wrong am I doing.
Thanks :)
start = EmptyOperator(
task_id="start",
)
fetch_cust_id = PythonOperator(
task_id = "fetch",
python_callable = lambda: 'C001',
)
update_orders = MySqlOperator(
task_id="update",
mysql_conn_id="mysql_default",
database="my_db",
sql="/update.sql",
parameters={
"custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
}
)
start >> fetch_cust_id >> update_orders
SQL file(update.sql):
UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;
:(
Solution 1:[1]
The parameters
is used to pass "variables" to SqlAlchemy engine. In this case the rendering is not done in Airflow engine. If you want to use this you need to use SqlAlchemy syntax.
Example:
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
But in your case you want to template xcom so there is no reason to use parameters
at all. You want the rendering to be done by Airflow.
You can just set it directly in the sql
since sql is a template field:
UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Elad Kalif |