'How to query the state store in the Kafka Streams DSL to implement consumer idempotency
I'm working in an scenario where duplicated messages could arrive at a consumer (a KStream application). To use the typical case let's suppose it's an OrderCreatedEvent and the KStream has a logic that processes the order. The event has an order-id that would help me identify duplicated messages.
What I want to do is:
1) Add every order to a persistent state store
2) When processing the message in the KStream, query the state store to check if the message had already been received, not doing anything in that case.
val persistentKeyValueStore = Stores.persistentKeyValueStore("order-store")
val stateStore: Materialized<Int, Order, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Order>(persistentKeyValueStore)
.withKeySerde(intSerde)
.withValueSerde(orderSerde)
val orderTable: KTable<Int, Order> = input.groupByKey(Serialized.with(intSerde, orderSerde))
.reduce({ _, y -> y }, stateStore)
var orderStream: KStream<Int, Order> = ...
orderStream.filter { XXX }
.map { key, value ->
processingLogic()
KeyValue(key, value)
}...
In the filter { XXX }
bit I would like to query the state store check if the order id is there (let's assume the order is used as the key of the keyvaluestore), filtering out orders already processed (present in the state store).
My first question is: how can I query a state store in the KStream DSL, e.g. inside the filter operation.
Second question: in this case, how can I handle the arrival of a new (not previously processed message)? If the KTable persists the order to the state store BEFORE the orderStream KStream execution the message would already be in the store. They should be added only after the processing has completed. How can I do this? It's likely I shouldn't be using a KTable for it but something like:
orderStream.filter { keystore.get(key) == null }
.map { key, value ->
processingLogic()
KeyValue(key, value)
}
.foreach { key, value ->
keystore.put(key, value);
}
Solution 1:[1]
Following Matthias' indications I implemented it like this:
DeduplicationTransformer
package com.codependent.outboxpattern.operations.stream
import com.codependent.outboxpattern.account.TransferEmitted
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore
import org.slf4j.LoggerFactory
@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {
private val logger = LoggerFactory.getLogger(javaClass)
private lateinit var dedupStore: KeyValueStore<String, String>
private lateinit var context: ProcessorContext
override fun init(context: ProcessorContext) {
this.context = context
dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, String>
}
override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
return if (isDuplicate(key)) {
logger.warn("****** Detected duplicated transfer {}", key)
null
} else {
logger.warn("****** Registering transfer {}", key)
dedupStore.put(key, key)
KeyValue(key, value)
}
}
private fun isDuplicate(key: String) = dedupStore[key] != null
override fun close() {
}
}
FraudKafkaStreamsConfiguration
const val DEDUP_STORE = "dedup-store"
@Suppress("UNCHECKED_CAST")
@EnableBinding(TransferKafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
@StreamListener
@SendTo(value = ["outputKo", "outputOk"])
fun process(@Input("input") input: KStream<String, TransferEmitted>): Array<KStream<String, *>>? {
val fork: Array<KStream<String, *>> = input
.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
.branch(Predicate { _: String, value -> fraudDetectionService.isFraudulent(value) },
Predicate { _: String, value -> !fraudDetectionService.isFraudulent(value) }) as Array<KStream<String, *>>
...
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | codependent |