'Jackson/Kafka LocalDateTime serialization not working properly
We have a very strange/intermittent issue with kafka and jackson. We have a dto defined which has 4 date field startdate, enddate and a embedded metatdata class which also has 2 date filed createdTime and updatedTime. All are using a user defined MongoDate class as their type.
@AllArgsConstructor
@Getter
@Setter
public static class MongoDate {
@JsonProperty("$date")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = OfferAllocationConstants.ISO_DATE_FORMAT)
private LocalDateTime date;
}
IsoDate format = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
Our Dto
@Getter
@Setter
@JsonInclude(JsonInclude.Include.NON_NULL)
@ToString
@JsonSerialize
public class OfferAllocation {
@Id
@NotNull(message = "allocationId cannot be null")
@JsonProperty("_id")
private String allocationId;
@NotNull(message = "offer id cannot be null")
private String globalId;
@NotNull(message = "customerId cannot be null")
@HashIndexed
private String customerId;
@NotNull(message = "identityType cannot be null")
private String identityType;
@NotNull(message = "status cannot be null")
private OfferAllocationStatus status;
@NotNull(message = "statusHistory cannot be null")
private StatusHistory statusHistory;
@NotNull(message = "startDate cannot be null")
@JsonFormat(shape = JsonFormat.Shape.STRING)
private MongoDate startDate;
@NotNull(message = "endDate cannot be null")
@JsonFormat(shape = JsonFormat.Shape.STRING)
private MongoDate endDate;
@NotNull(message = "metadata cannot be null")
private OfferAllocationMetadata metadata;
private Map<String, String> customAttributes;
@AllArgsConstructor
@Getter
@Setter
public static class MongoDate {
@JsonProperty("$date")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = OfferAllocationConstants.ISO_DATE_FORMAT)
private LocalDateTime date;
}
}
We have also this configuration
@Configuration
public class ApplicationConfig {
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
return mapper;
}
}
We then send this dto to kafka.
The code which produces message to kafaka
kafkaTemplate.send(buildOfferAllocationMessage(offerAllocation, createTopic))
private Message buildOfferAllocationMessage(final OfferAllocation offerAllocation, String topic) {
return MessageBuilder
.withPayload(offerAllocation)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.MESSAGE_KEY, offerAllocation.getCustomerId())
.setHeader(SOURCE_APPLICATION, sourceApplication)
.build();
}
The correct format looks like
{
"globalId": "foobar",
"customerId": "foobar",
"identityType": "foobar",
"status": "ALLOCATED",
"startDate": {
"$date": "2022-04-13T00:00:00.000Z"
},
"endDate": {
"$date": "2023-04-13T23:59:59.999Z"
},
"metadata": {
"updatedUuid": "673e4b6b-995d-4e5b-a1c9-332e3bd998a4",
"createdTime": {
"$date": "2022-04-13T07:14:39.408Z"
},
"createdChannel": "SPARKS_STREAK",
"offset": "22111",
"partition": "1",
"allocationId": "15863",
"sourcePrefix": 13,
"operationType": "CREATE",
"allocationTime": {
"$date": "2022-04-13T07:14:38.797Z"
}
},
"_id": "50733124915864"
}
But sometimes it is coming in the below format which is not expected.
{
"globalId": "foobar",
"customerId": "foobar",
"identityType": "foobar",
"status": "ALLOCATED",
"startDate": {
"$date": {
"nano": 0,
"year": 2022,
"monthValue": 4,
"dayOfMonth": 13,
"hour": 0,
"minute": 0,
"second": 0,
"dayOfWeek": "WEDNESDAY",
"dayOfYear": 103,
"month": "APRIL",
"chronology": {
"id": "ISO",
"calendarType": "iso8601"
}
}
},
"endDate": {
"$date": {
"nano": 999999999,
"year": 2022,
"monthValue": 4,
"dayOfMonth": 16,
"hour": 23,
"minute": 59,
"second": 59,
"dayOfWeek": "SATURDAY",
"dayOfYear": 106,
"month": "APRIL",
"chronology": {
"id": "ISO",
"calendarType": "iso8601"
}
}
},
"metadata": {
"updatedUuid": "b106b69b-c627-4a5d-969e-61807eb2d034",
"createdTime": {
"$date": {
"nano": 222290000,
"year": 2022,
"monthValue": 4,
"dayOfMonth": 13,
"hour": 7,
"minute": 3,
"second": 46,
"dayOfWeek": "WEDNESDAY",
"dayOfYear": 103,
"month": "APRIL",
"chronology": {
"id": "ISO",
"calendarType": "iso8601"
}
}
},
"createdChannel": "BEAM",
"offset": "10114378",
"partition": "38",
"batchId": 1244,
"messageId": 88,
"sourcePrefix": 10,
"operationType": "CREATE",
"allocationType": "SFMC_TRIGGER",
"allocationTime": {
"$date": {
"nano": 557000000,
"year": 2022,
"monthValue": 4,
"dayOfMonth": 13,
"hour": 7,
"minute": 3,
"second": 4,
"dayOfWeek": "WEDNESDAY",
"dayOfYear": 103,
"month": "APRIL",
"chronology": {
"id": "ISO",
"calendarType": "iso8601"
}
}
}
},
"_id": "66789818215641"
}
We were expecting this to be a performance problem but in our performance test it is not being simulated. Its coming very randomly.
We are using org.springframework.kafka.core.KafkaTemplate to produce events to kafka.
Producer configuration
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer:org.springframework.kafka.support.serializer.JsonSerializer
jackson dependency
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.0'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.10.0'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.10.3'
kafka dependancy
compile "org.springframework.kafka:spring-kafka:2.6.4"
Spring boot version-2.3.5Release
Solution 1:[1]
After few expirements we reproduced this issue. It was with the org.apache.kafka.common.serialization.JsonSerializer class. It is only happening when reading from multiple topic/higher throughput. The exact reason we don't know but looks like it was the objectmapper it was using. Extending the class and using the custom objectmapper(bean defined by us) solved the issue.
public class CustomJsonSerializer extends JsonSerializer<OfferAllocation> {
public static final ObjectMapper objectmapper = createMapper();
public static final ObjectMapper createMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
return mapper;
}
public CustomJsonSerializer() {
super(objectmapper);
}
}
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer:org.springframework.kafka.support.serializer.CustomJsonSerializer
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | TARA MISHRA |