'Error in airflow 2 even though my task's have actually completed? ERROR - Could not serialize the XCom value into JSON
Hi All so my dag actully runs fine, all the outputs are working but airflow's UI does not change to succes and fails due to the following issue. Reading online and i have come across these 2:
do_xcom_push=False
and that Xcom_push will be deprecated in Airflow version 2.0.
Im just not sure how to actully set this up? Can anyone share any insight?
Full Code:
import pandas as pd
import logging
import csv
import numpy as np
import datetime
import glob
import shutil
import time
import gcsfs
from airflow.operators import python_operator
from google.cloud import bigquery
import os
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "table"
# Dataframe Code
query_string = """
SELECT * FROM `df_table`
"""
gas_data = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
))
manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}
meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
'G4F9': {'': 'G4SZV-1'},
'G4F1': {'': 'G4SDZV-2'},
'G4K0': {'': 'BK-G4E'},
'E6S1': {'': 'E6VG470'},
'E6S2': {'': 'E6VG470'},
}
def map_manufacturer_model(s):
s = str(s)
model = ''
try:
manufacturer = manufacturers[s[:4]]
for k, m in meter_models[s[:4]].items():
if s[-4:].startswith(k):
model = m
break
except KeyError:
manufacturer = ''
return pd.Series({'NewMeterManufacturer': manufacturer,
'NewMeterModel': model
})
gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[],
write_disposition="WRITE_TRUNCATE", )
job = client.load_table_from_dataframe(gas_data, table_id, job_config=job_config) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id))
print('Loaded DATAFRAME into BQ TABLE')
default_dag_args = {'owner': 'ME',
'start_date': datetime.datetime(2021, 12, 16),
}
with models.DAG('Test_Dag_V1',
schedule_interval=None, #'45 10 * * *',
default_args=default_dag_args) as dag:
format_function = python_operator.PythonOperator(
task_id='format_function',
python_callable=format_data
),
map_manufacturer_model_function = python_operator.PythonOperator(
task_id='map_manufacturer_model_function',
python_callable=map_manufacturer_model,
op_kwargs={'s': 'stringValue'}
)
format_function >> map_manufacturer_model_function
Error in airflow
[2021-12-15 16:44:26,180] {xcom.py:228} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-12-15 16:44:26,182] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1318, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1905, in xcom_push
XCom.set(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 79, in set
value = XCom.serialize_value(value)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 226, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Series is not JSON serializable
Solution 1:[1]
If your goal is to use the output of the map_manufacturer_model
function to another tasks, I would consider treating the object as a dict or string. It seems small enough to not need the complexity of being turned into a Series at this point.
{'NewMeterManufacturer': manufacturer,
'NewMeterModel': model
}
Solution 2:[2]
In your airflow.cfg file, enable_xcom_pickling
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | Igor Couto |