'json load fails for nested json in kinesis firehose stream
I am trying to stream cloudwatch metric using kinesis firehose to S3. I am using Lambda python function to manipulate data. My major issue is the nested payload json is failing when I am trying json.load. I have tried multiple way to achieve it but not able to. Pasting below all the debug outputs
import base64
import json
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')
payload_obj = json.loads(payload)
# Do custom processing on the payload here
payload_obj['event_timestamp'] = int(payload_obj['timestamp'] / 1000)
print(payload_obj)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(payload_obj, default=str).encode('utf-8')).decode("utf-8")
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
Error is:
[ERROR] JSONDecodeError: Extra data: line 2 column 1 (char 373)
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 13, in lambda_handler
payload_obj = json.loads(payload)
File "/var/lang/lib/python3.9/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/var/lang/lib/python3.9/json/decoder.py", line 340, in decode
raise JSONDecodeError("Extra data", s, end)
Payload is:
Each payload is as below
Record 1:
{
"metric_stream_name": "timestamp-day-partition-parquet",
"account_id": "123456",
"region": "us-east-1",
"namespace": "AWS/RDS",
"metric_name": "ForwardingMasterOpenSessions",
"dimensions": {
"DBClusterIdentifier": "aurora-mysql-testbox",
"Role": "WRITER"
},
"timestamp": 1646884680000,
"value": {
"max": 0,
"min": 0,
"sum": 0,
"count": 1
},
"unit": "Count"
}
Record 2:
{
"metric_stream_name": "atlas-timestamp-day-partition-parquet",
"account_id": "123456",
"region": "us-east-1",
"namespace": "AWS/RDS",
"metric_name": "Aurora_pq_request_not_chosen_update_delete_stmts",
"dimensions": {
"DBInstanceIdentifier": "test-aurora-mysql-sandbox-reader-1"
},
"timestamp": 1646884680000,
"value": {
"max": 0,
"min": 0,
"sum": 0,
"count": 1
},
"unit": "Count"
}
Solution 1:[1]
The error JSONDecodeError("Extra data", s, end)
indicates that there are multiple JSON objects in the payload, which you've already noted as Record 1 and Record 2. json.loads()
is not able to parse multiple JSON objects, hence the "extra data" error.
See this Stack Overflow post for more details: Python json.loads shows ValueError: Extra data
An answer from the post suggests wrapping the JSON objects in an array, and then using json.loads()
to parse the array.
payload = base64.b64decode(record['data']).decode('utf-8')
payload_list = json.loads("[" + payload + "]")
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Andrew Nguonly |