Category "apache-kafka"

How to properly test kafkaTemplate.send() within a function in Junit5?

I'm learning how to write tests and especially tests that have a producer in it. I cannot post all the classes because it's HUGE (and not mine, I should just pr

kafka failed authentication due to: SSL handshake failed

I have to add encryption and authentication with SSL in kafka. This is what I have done: Generate certificate for each broker kafka: keytool -keystore server

Docker - library initialization failed - unable to allocate file descriptor table - out of memory

I have been try to run Zookeeper and Kafka on Docker container. I got a lot of errors [error occurred during error reporting , id 0xb] and [Too many errors, abo

KTable-KTable foreign-key join not producing all messages when topics have more than one partition

See Update below to show potential workaround Our application consumes 2 topics as KTables, performs a left join, and outputs to a topic. During testing, we fou

Confluent Kafka Connector Configuration with C# / .net

I'm trying to build a connector to go into the confluent kafka library. I have seen many examples in java that use configDef to define the configuration options

Extract particular data from Kafka topic

I'm doing real time streaming on Twitter and wonder is there a way to extract only messages and certain values from Kafka topic?

Create pyFlink DataStream Consumer from Tweets Kafka Producer in Python

I want to create I stream kafka consumer in pyFlink, which can read tweets data after deserialization (json), I have pyflink version 1.14.4 (last version) Can I

How do I serialize non-ascii characters in pykafka?

I have a defauldict I want to serialize as json. But some characters then replaced with unicode escaped charasters. For example, if I have string "International

kerberos error while authenticating on Confluent Kafka

I´ve been trying to understand apache beam, confluent kafka and dataflow integration with python 3.8 and beam sdk 2.7 the desire result is to build a pipe

Send and load an ML model over Apache Kafka

I've been looking around here and on the Internet, but it seems that I'm the first one having this question. I'd like to train an ML model (let's say something

Connect to Kafka running in Docker

I setup a single node Kafka Docker container on my local machine like it is described in the Confluent documentation (steps 2-3). In addition, I also exposed Z

Kafka authentication with Jaas config

I have set up my Kafka jaas config as an external bean in my spring boot application to read my configuration from my application.yaml file. But I am facing an

get count of partitions in a kafka topic with scala 2.12

With scala 2.11 and spark-streaming-kafka-0-8_2.11 I could do import org.apache.spark.streaming.kafka.KafkaCluster val params = Map[String, Object]( "bootstr

Found directory not in the form of topic-partition . Kafka's log directories (and children) should only contain Kafka topic data

[2021-04-05 07:51:32,180] ERROR There was an error in one of the threads during logs loading: org.apache.kafka.common.KafkaException: Found directory /var/lib/k

Why kafka write null data to the database?

I am working on a nestjs project. My project gets data from Kafka's topic and writes the data to the database (mysql). If I read hundreds of messages from Kafka

How to test Kafka OnFailure callback with Junit?

I have the following code to send data to Kafka: @Service public class KafkaSender{ @Autowired private KafkaTemplate<String, Employee> kafkaTempla

Spark + Read kafka topic from a specific offset based on timestamp

How do I set a spark job to pick up a kafka topic from a specific offset based on a timestamp ? Let's say that I need to get all data from a kafka topic startin

Apache Kafka - Implementing a KTable

I am new to Kafka Streams API and I am trying to create a KTable. I have an input topic: s-order-topic, which is a json format message, as shown below. { "curr

Is it possible to query a KSQL Table/Materialized view via HTTP?

I have a materialized view created using CREATE TABLE average_latency AS SELECT DEVICENAME, AVG(LATENCY) AS AVG_LATENCY FROM metrics WINDOW TUMBLING (SIZE 1 MIN

kafka consumer is not listening from command line

I am using kafka 1.0.0V In my project. From yesterday on wards. I am unable to listen Messages from command line . In the same time I am able to listen the mess