'How can a org.apache.kafka.connect.data.Decimal stored in an avro file be converted to a python type?

I am trying to interpret a Avro record stored by Debezium in Kafka, using Python

           {
              "name": "id",
              "type": {
                "type": "bytes",
                "scale": 0,
                "precision": 64,
                "connect.version": 1,
                "connect.parameters": {
                  "scale": "0"
                },
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              }
            }

I am not sure to which Python 3 primitive type this corresponds to. How can this value be deserialised?

Thanks in advance!



Solution 1:[1]

org.apache.kafka.connect.data.Decimal is base64 encoded byte representation of unscaled integer. In order to convert this value to Decimal, you need to decode base64 string to bytes, obtain integer and then scale it by parameters.scale value.

This schema:

{
  "type": "bytes",
  "name": "org.apache.kafka.connect.data.Decimal",
  "version": 1,
  "parameters": {
    "scale": "9",
    "connect.decimal.precision": "38"
  },
  "field": "amount"
}

Can be converted with this following snippet (try it on Pyfiddle):

ctx = decimal.Context()
ctx.prec = 38  # connect.decimal.precision = 38
result = ctx.create_decimal(
    int.from_bytes(base64.b64decode("GZ6ZFQvYpA=="), byteorder='big')
) / 10 ** 9  # scale = 9

Solution 2:[2]

If you look at

https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java

public static byte[] fromLogical(Schema schema, BigDecimal value) {
    if (value.scale() != scale(schema))
        throw new DataException("BigDecimal has mismatching scale value for given Decimal schema");
    return value.unscaledValue().toByteArray();
}

As you can see it uses BigDecimal and that is equivalent to Decimal in python

What is the python for Java's BigDecimal?

So you should be looking for Decimal in this case.

Part 2 - Deserialization

About the deserialization, I need feedback to update the answer. How do you do it for other fields as of now?

Solution 3:[3]

When I use other answer for negative number, it give wrong result. for example -20.62 numeric is converted to "+CA=" by debezium to kafka

I find the solution from below link and I change like this.

import decimal
import base64
def big_decimal_to_decimal(big_decimal, scale, precision):
    bytes_val = base64.decodebytes(big_decimal.encode())
    bval = "".join("{0:08b}".format(c) for c in bytes_val)
    intval = int(bval, 2)
    if bytes_val[0] & 0x70 != 0:
        intval -= int('1' + '00' * len(bytes_val), 16)
    return intval/(10**scale)

link : Decode base64 encoded byte array to (negative) decimal value (Java to Python)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Tom C
Solution 3 OneCricketeer