'Airflow SimpleHttpOperator is not pushing to xcom

I have the following SimpleHttpOperator inside my dag:

extracting_user = SimpleHttpOperator(
        task_id='extracting_user',
        http_conn_id='user_api',
        endpoint='api/', # Some Api already configured and checked
        method="GET",
        response_filter=lambda response: json.loads(response.text),
        log_response=True,
        do_xcom_push=True,
    )

followed by a PythonOperator:

processing_user = PythonOperator(
        task_id='processing_user',
        python_callable=_processing_user
    )

The function:

def _processing_user(ti):
    users = ti.xcom_pull(task_ids=['extracting_user'])
    if not len(users) or 'results' not in users[0]:
        raise ValueError(f'User is empty')

    **More function code**

When I execute airflow tasks test myDag extracting_user 2022-03-02 followed by airflow tasks test myDag processing_user 2022-03-02 I get the value error with users variable equals to an empty array.

I have tested extracting_user task alone and it gets the desired data from the API. I have already queried with sqlite xcom and it is an empty table.

I am using airflow 2.3.0



Solution 1:[1]

I solved the problem changing to the version 2.0.0 of airflow. It seems that the SimpleHttpOperator doesn't store the request response on the xcom table on 2.3.0 version

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1