'Airflow - call a operator inside a function
I'm trying to call a python operator which is inside a function using another python operator. Seems something I missed, can someone help me to find out what I missed.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
dd = datetime(2018, 1, 1)
args = {
'owner': 'airflow',
'start_date': dd,
'retries': 0
}
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo "task1"',
xcom_push=True,
dag=dag)
return t1
with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
python_task
Error:
[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
dag=dag)
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
super(BashOperator, self).__init__(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
self.dag = dag
File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
dag.add_task(self)
File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410
One workaround suggested by Racooneer (but still the issue is there)
Thanks, Racooneer!!!
Removing default_args helped to solve it, but not able to see bash command output
Solution 1:[1]
I'm not exactly sure what you are trying to do but the code you posted in the python function doesn't really execute the operator.
This should work just fine:
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo task1',
xcom_push=True #Note: there is no dag=dag here!
)
t1.execute(dict())
with DAG(
'python_dag',
description='Python DAG',
schedule_interval='*/15 * * * *',
start_date=datetime(2018, 1, 1),
catchup=False
) as dag:
python_task = PythonOperator(
task_id='python_task',
python_callable=postgres_to_gcs
)
Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute
method.
Note: Using operator inside operator is not a good practice. You should use hooks or create custom operators. You can read more about why in the following answer.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |