Category "apache-kafka"

How to skip kafka history data in flink job if certain lag is encountered?

Sometimes we encounter lag in kafka consumer due to some external issues. Flink job will always consume kafka history (delayed data) with exactly-once semantics

How to stop listening in a Spring Kafka consumer?

I use Spring for Apache Kafka. I'd like to stop listening to my topic and wait to escape OOM. How can I do it?

Can't establish SSL connection to Kafka after upgrading to python 3.7

Code I have that successfully connects to Kafka with an SSL connection in Python 3.6.7 fails when using Python 3.7.3, with error message SSL: WRONG_VERSION_NUMB

Kafka - broker partitions not in-sync after restart

We use 3 node kafka clusters running 2.7.0 with quite high number of topics and partitions. Almost all the topics have only 1 partition and replication factor o

Kafka MirorMaker2 ports for inter cluster communication

I have setup Apache MirrorMaker 3.0.0 with active-active strategy for two Kafka clusters (named DC, DR). So topic on DC is replicated by MirrorMaker2 as DC.<

Connection between kafka and spark : Failed to find data source : kafka

I am trying to do link between kafka and spark by reading data from one topic and tryy to print the content of this topic into a DataFrame, but by doing connect

Unable to connect to kafka in docker from Spring boot in host machine

I have already gone though Robin Moffat's blog, and several SO posts on the same subject, but still my configuration doesnt work. My docker-compose.yml: kafka

How to start a process (or Kafka to be specific) on remote host with Ansible Playbook

How to start Zookeeper and Kafka broker on remote target with Ansible Playbook. Following commands work fine locally. Start Zookeeper: cd /opt/kafka ./bin/zooke

Structured Streaming to Save JSON to HDFS

My Structured Spark Streaming program is to read JSON data from Kafka and write to HDFS in JSON format. I am able to save JSON to HDFS but it saves the JSON st

No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>'

I use this kafka configuration with spring cloud and spring boot 2.6.6: @Configuration @RefreshScope public class KafkaProducerConfig { @Bean(name = "nativeP

How to get kafka offset data, specified on timestamp

I've tried to get the offset from Kafka topic based on timestamp when I tried to run it was throwing null pointer error, Map<TopicPartition, Long> timest

How to query the state store in the Kafka Streams DSL to implement consumer idempotency

I'm working in an scenario where duplicated messages could arrive at a consumer (a KStream application). To use the typical case let's suppose it's an OrderCrea

How to monitor the amount of messages in a Kafka topic per day?

I have a Kafka cluster with a topic that receives thousands of messages a day and I want to see how many messages went in the topic per date. I'm using JMX expo

Kafka streams - Concatenate Predicate based on dynamic number of conditions

I'm a bit new in Java so I would appreciate advice to deal with multiple conditions in Kafka Predicates. I've the following code which I'm able to have dynamic

Kafka connector and Schema Registry - Error Retrieving Avro Schema - Subject not found

I have a topic that will eventually have lots of different schemas on it. For now it just has the one. I've created a connect job via REST like this: { "name"

Flink Python Datastream API Kafka Producer Sink Serializaion

I'm trying to read data from one kafka topic and writing to another after making some processing. I'm able to read data and process it when i try to write it to

Kafka producer config: Why request.timeout.ms should be larger than replica.lag.time.max.ms

From Kafka doc https://kafka.apache.org/11/documentation.html#producerconfigs , it says that: The configuration controls the maximum amount of time the client

java.lang.RuntimeException: Failed to resolve Oracle database version

I am using debezium oracle connector in kafka connect.While starting connector I am getting below error, java.lang.RuntimeException: Failed to resolve Oracle da

Helm range yaml template kafka topics

I am new to helm and I am trying to generate different topics for kafka with a range function to not have a yaml file for each topic: I have different topics (t

This error handler cannot process 'SerializationException's directly;

The image below is my topic. Every once in a while, a value other than an empty or json is returned. If it is null, it throws an error as below and enters the