'Defining complex workflow dependency in airflow 2.0 taskflow API

Let's say I have the follow dummy DAG defined as below:

@dag(default_args=default_args,
     schedule_interval=None,
     start_date=days_ago(2))
def airflow_taskflow_api_dag():
    cur_day = '2020-01-01'

    @task()
    def A(current_day: str):
        return current_day + ' 10'

    @task(multiple_outputs=True)
    def B(current_date_time: str):
        timestamp = calendar.timegm(
            datetime.strptime(
                current_date_time, '%Y-%m-%d %H').timetuple())

        another_current_date_time_format = '2020-01-01T10:00:00'

        return {
            "timestamp": timestamp,
            "new_cur_date_time": another_current_date_time_format
        }

    @task()
    def C(livy_batch_id, start_time: str):
        logging.info("Executing task C {}".format(start_time))
       return "c_id"

    @task()
    def D(upstream_id, start_time: str):
        logging.info("Executing task D {}".format(start_time))

    # workflow dependencies
    get_current_time = A(cur_day)
    new_date_time = B(get_current_time)

    livy_task = LivyOperator(
        task_id='livy_task',
        file="dummy_file",
        class_name="dummy_class",
        args=[
            new_date_time['timestamp']
        ],
        driver_memory='1G',
        driver_cores='4',
        executor_memory='1G',
        executor_cores='4',
        num_executors='4',
        queue='default_queue',
        livy_conn_id='livy_conn_id'
    )

    c_task = C(livy_task.output, new_date_time['new_cur_date_time'])

    d_task = D(c_task, new_date_time['new_cur_date_time'])

airflow_taskflow_api_dag = airflow_taskflow_api_dag()

If I wrote my DAG like this (passing two params from upstream tasks), the dependency would be like: Screen Shot 2021-03-11 at 3 17 37 PM

But I want to define the dependency with: A >> B >> livy_task >> C >> D

Is there a way to do this by using taskflow api? It seems like if I pass the task B's output to livy_task, C and D, the three tasks will be run in parallel.

Thanks!



Solution 1:[1]

Your current dependency graph could be represented in Airflow 1.10.* syntax as:

A >> B >> [C, D, livy_task]
livy_task >> C >> D

But I want to define the dependency with: A >> B >> livy_task >> C >> D

According to this similar post, it's not possible to remove existing edges in this dependency graph, while keeping the existing operators.

Reason:
In the case of B >> D, the dependency exists because the output of B, new_date_time, is one of the inputs for D.

There's no way (yet) to remove hide those implicit edges in UI, but they shouldn't impact your execution flow since trigger_rule by default is all_success.

Extra Note:
If you wanted to add more edges, that'd be simple, since Taskflow API supports setting dependencies using bitwise operators (ie. livy_task >> D)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 mts