'Error in airflow 2 even though my task's have actually completed? ERROR - Could not serialize the XCom value into JSON

Hi All so my dag actully runs fine, all the outputs are working but airflow's UI does not change to succes and fails due to the following issue. Reading online and i have come across these 2:

do_xcom_push=False

and that Xcom_push will be deprecated in Airflow version 2.0.

Im just not sure how to actully set this up? Can anyone share any insight?

Full Code:

import pandas as pd
import logging
import csv
import numpy as np
import datetime
import glob
import shutil
import time
import gcsfs
from airflow.operators import python_operator
from google.cloud import bigquery
import os
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "table"
# Dataframe Code
query_string = """
SELECT * FROM `df_table`
"""
gas_data = (
    bqclient.query(query_string)
        .result()
        .to_dataframe(
        create_bqstorage_client=True,
    ))

manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}

meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
                'G4F9': {'': 'G4SZV-1'},
                'G4F1': {'': 'G4SDZV-2'},
                'G4K0': {'': 'BK-G4E'},
                'E6S1': {'': 'E6VG470'},
                'E6S2': {'': 'E6VG470'},
                }

def map_manufacturer_model(s):
    s = str(s)
    model = ''
    try:
        manufacturer = manufacturers[s[:4]]
        for k, m in meter_models[s[:4]].items():
            if s[-4:].startswith(k):
                model = m
                break
    except KeyError:
        manufacturer = ''

    return pd.Series({'NewMeterManufacturer': manufacturer,
                      'NewMeterModel': model
                      })


gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
    # Specify a (partial) schema. All columns are always written to the
    # table. The schema is used to assist in data type definitions.
    schema=[],
    write_disposition="WRITE_TRUNCATE", )
job = client.load_table_from_dataframe(gas_data, table_id, job_config=job_config)  # Make an API request.
job.result()  # Wait for the job to complete.
table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows and {} columns to {}".format(
    table.num_rows, len(table.schema), table_id))
print('Loaded DATAFRAME into BQ TABLE')

default_dag_args = {'owner': 'ME',
                    'start_date': datetime.datetime(2021, 12, 16),
                    }

with models.DAG('Test_Dag_V1',
                schedule_interval=None, #'45 10 * * *',
                default_args=default_dag_args) as dag:
    format_function = python_operator.PythonOperator(
        task_id='format_function',
        python_callable=format_data
    ),

    map_manufacturer_model_function = python_operator.PythonOperator(
        task_id='map_manufacturer_model_function',
        python_callable=map_manufacturer_model,
        op_kwargs={'s': 'stringValue'}
    )


format_function >> map_manufacturer_model_function

Error in airflow

 [2021-12-15 16:44:26,180] {xcom.py:228} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
    [2021-12-15 16:44:26,182] {taskinstance.py:1465} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1318, in _execute_task
        self.xcom_push(key=XCOM_RETURN_KEY, value=result)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
        return func(*args, session=session, **kwargs)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1905, in xcom_push
        XCom.set(
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
        return func(*args, **kwargs)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 79, in set
        value = XCom.serialize_value(value)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 226, in serialize_value
        return json.dumps(value).encode('UTF-8')
      File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
        return _default_encoder.encode(obj)
      File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
        raise TypeError(f'Object of type {o.__class__.__name__} '
    TypeError: Object of type Series is not JSON serializable


Solution 1:[1]

If your goal is to use the output of the map_manufacturer_model function to another tasks, I would consider treating the object as a dict or string. It seems small enough to not need the complexity of being turned into a Series at this point.

{'NewMeterManufacturer': manufacturer,
 'NewMeterModel': model
}

Solution 2:[2]

In your airflow.cfg file, enable_xcom_pickling

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Igor Couto