'Spring cloud stream with kafka streams binder: how to set `trusted.packages` for a Stream Processor (that's different than consumer and producer)
I have a simple stream processor (not consumer/producer) that looks like this (Kotlin)
@Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
return Function { input-> input.map { key, value ->
println("\nPAYLOAD KEY: ${key.name}\n");
println("\nPAYLOAD value: ${value.address}\n");
val output = FooAddressPlus()
output.address = value.address
output.name = value.name
output.plus = "$value.name-$value.address"
KeyValue(key, output)
}}
}
These classes FooName
, FooAddress
and FooAddressPlus
are in the same package as the processor.
Here’s my config file:
spring.cloud.stream.kafka.binder:
brokers: localhost:9093
spring.cloud.stream.function.definition: processFoo
spring.cloud.stream.kafka.streams.binder.functions.processFoo.applicationId: foo-processor
spring.cloud.stream.bindings.processFoo-in-0:
destination: foo.processor
spring.cloud.stream.bindings.processFoo-out-0:
destination: foo.processor.out
spring.cloud.stream.kafka.streams.binder:
deserializationExceptionHandler: logAndContinue
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000
I get this error when running the processor:
The class '<here_comes_package>.FooAddress' is not in the trusted packages: [java.util, java.lang].
If you believe this class is safe to deserialize, please provide its name.
If the serialization is only done by a trusted source, you can also enable trust all (*).
What is the best way to set trusted packages to everything when using Kafka Streams Binder Stream Processor? (no consumer/producer but stream processor)
Thanks a lot!
Solution 1:[1]
You can set arbitrary configuration properties (Kafka streams properties, properties used by the serdes, etc) under ...binder.configuration
:
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
spring.json.trusted.packages: '*'
Solution 2:[2]
I am using Spring Cloud Stream binder for Kafka, adding below properties has resolved deserialization issue with trusted packages
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=*
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Andras Hatvani |
Solution 2 | Riaz |