Category "apache-kafka-streams"

Avro Definition for Custom Aggregator

I have a code where I am aggregating the data from Kafka stream via: StreamsBuilder streamsBuilder = new StreamsBuilder(); streamsBuilder.table(AppConfigs.t

How commit interval & cache max buffer behave in a sub-topology with multiple state store?

Taken from the book: Mastering Kafka Streams and ksqlDB Kafka Streams uses a depth-first strategy when processing data. When a new record is received, it is ro

How to query the state store in the Kafka Streams DSL to implement consumer idempotency

I'm working in an scenario where duplicated messages could arrive at a consumer (a KStream application). To use the typical case let's suppose it's an OrderCrea

Kafka streams - Concatenate Predicate based on dynamic number of conditions

I'm a bit new in Java so I would appreciate advice to deal with multiple conditions in Kafka Predicates. I've the following code which I'm able to have dynamic

KTable-KTable foreign-key join not producing all messages when topics have more than one partition

See Update below to show potential workaround Our application consumes 2 topics as KTables, performs a left join, and outputs to a topic. During testing, we fou

Apache Kafka - Implementing a KTable

I am new to Kafka Streams API and I am trying to create a KTable. I have an input topic: s-order-topic, which is a json format message, as shown below. { "curr

Is it possible to query a KSQL Table/Materialized view via HTTP?

I have a materialized view created using CREATE TABLE average_latency AS SELECT DEVICENAME, AVG(LATENCY) AS AVG_LATENCY FROM metrics WINDOW TUMBLING (SIZE 1 MIN

Complex aggregation

I have data in a topic that needs to be counted at multiple levels and all code and articles only mention the word count example. An example of the data would

Why does co-partitioning of two Kstreams in kafka require same number of partitions for both the streams?

I wanted to know why does co-partitioning of two Kstreams in kafka require same number of partitions for both the streams as is given in the documentation in be

Streams Processor key-value types

I'm trying to implement a custom topology processing step implementing the Processor interface and then adding an instance of my custom processor to the topolog

Kafka infinite loop of error "SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation"

I'm having Kafka Consumer group of applications (10 instances) written in Java which uses Spring Cloud Stream. Consumer application is deployed in AWS Kubernete

Error sending fetch request (sessionId=1175648978, epoch=189) to node 53: org.apache.kafka.common.errors.DisconnectException

We have a topic with 100 partitions and the load is millions of records per hour. We ran into the problem whenever we deploy a new version of stream-processor

Sliding window using Faust

Does anyone know how to implement a sliding window using Faust? The idea is to count the occurances of a key in a 10, 30, 60, and 300s window, but we need that

Sliding window using Faust

Does anyone know how to implement a sliding window using Faust? The idea is to count the occurances of a key in a 10, 30, 60, and 300s window, but we need that

Unable to deserialize Kafka stream to pojo. Could not find class specified in writer's schema

Getting exception while reading from kafka topic: Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for

How to write a KafkaAvro Serde for GenericData.Record

I'm using Kafka 0.10.2 and Avro for the serialization of my messages, both for the key and for the value data. Now I would like to use Kafka Streams but I'm stu

Why retention.ms of Kaka Streams repartition topic is set to -1 by default? Isn't this infinitely retain messages in repartition topic?

I think it's related to the below links, but I don't understand. https://issues.apache.org/jira/browse/KAFKA-6535 https://issues.apache.org/jira/browse/KAFKA-61