'Is this the right Kafka Consumer Config - under this Kafka setup?

My Kafka Producer is producing messages at the rate of about .. 350 mb per 30 seconds..

Kafka Setup:

--> 1 Zookeeper instance

--> 3 Kafka Brokers

--> 1 Java Producer

--> 1 Java Consumer

This is how I created Topic and broker partitions:

bin/kafka-topics.sh --create --zookeeper 10.10.1.5:2181 --replication-factor 1 --partitions 8 --topic test

Rest of the configuration is as follows..

Producer Code:

KeyedMessage<String, byte[]> publishData = new KeyedMessage<String, byte[]>(this.topic, data);
producer.send(publishData);

Here, data is a 5000 length byte[].

Producer Config:

batch.size = 200
producer.type = async
sflow-topic = test
connect.timeout.ms = 10000
request.required.acks = 0
zk.connect = 10.10.1.5:2181
serializer.class = kafka.serializer.DefaultEncoder
partitioner.class = kafka.producer.DefaultPartitioner
metadata.broker.list = 10.10.1.5:9092,10.10.1.6:9092,10.10.1.7:9092

I can see my producer working just fine.. Problem is with the consumer consuming the messages.. Even if the consumer is lagging behind, I do not see my messages consumed (and eventually process, and inserted in a DB) in the same/equal pace.. Also, I ran few tests on the consumer where I found that, not all the messages are consumed by my consumer.. Not sure why :(

Consumer Code:

 public class FlowConsumer {
    private final String topic;
    private final ExecutorService threadPool;
    private final ConsumerConnector consumer;
    private static AppProperties appProperties;
    private final ExecutorService processDataThreadPool;

    public FlowConsumer() throws Exception {
        /**
         * Load properties configuration for flowLog4j.properties.
         */
        appProperties = AppProperties.loadConfiguration();

        /** Assign the flow-topic.. */
        this.topic = appProperties.getString(AppConstants.FLOW_TOPIC);
        logger.fatal("Topic : "+topic);

        /** Initialize the thread pool to consume kafka byte[] streams.. */
        this.threadPool = Executors.newFixedThreadPool(20);

        /** Initialize the thread pool for processing kafka byte[] messages.. */
        this.processDataThreadPool = Executors.newFixedThreadPool(100);

        /** Fetch the Consumer Config, by reading the Flow.properties file.. */
        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerPropertyConfig.getConsumerConfig(appProperties));

        logger.fatal("Consumer : "+consumer);

        //new Thread(new Consumer()).start();
        threadPool.submit(new Consumer());
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (threadPool != null) threadPool.shutdown();
        if (processDataThreadPool!= null) processDataThreadPool.shutdown();
    }

    private class Consumer implements Runnable {

        public Consumer() {
            logger.fatal("Started Consumer Thread!");
        }

        @Override
        public void run() {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
            for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
                for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
                    processDataThreadPool.submit(new FlowServiceImpl(messageAndMetadata.message()));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        FlowConsumer consumer = new FlowConsumer();

        /*try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
        consumer.shutdown();*/
    }
}

Consumer Config:

group.id = group1
flow-topic = test
auto.offset.reset = smallest
auto.commit.interval.ms = 2000
zookeeper.connect = 10.10.1.5:2181
zookeeper.sync.time.ms = 2000
zookeeper.session.timeout.ms = 2000
zookeeper.connection.timeout.ms = 6000

Question 1:

For 3 brokers, can/should I create more than 3 partitions? I read that more partitions means, I can add more parallelism to my consumer? But how, by using more consumer thread on a single consumer? Or by having 3 consumer instances, having 1 thread each?

Question 2:

Is my Java consumer config code correct/wrong?

Can anyone please tell me what am I doing wrong in here?



Solution 1:[1]

  1. You can create more than 3 partitions but it's not clear what speedup you will get; depends on your network, disks, etc. With N partitions, you will want N consumer threads reading one partition each, on one or more machines.

  2. Are you sure that all messages are going into Kafka in the first place? For an async producer, you'll want request.required.acks to be -1 for the best guarantees. There is a kafka log dump tool that can be used to inspect the data on the brokers. On the consumer end, log the Kafka offsets of the messages that you read, to verify if there are skipped ones.

Solution 2:[2]

First, you can check the folder of logs which is named log.dir in server.properties at broker, you could run "tail -f" to check wether messages are send successfully.

Solution 3:[3]

Answer#1: yes you can create 8 partitions for 3 brokers. no. of brokers doesn't have any constraint on no. of partitions. For example you have 8 tables you are fetching the data and submitting to topics. if your producer submitting messages to topic with key as table name and you have 8 partitions most probably each table messages going to its own partition.

By matching your number of consumers equals to number of partitions means that all consumers reading to one partitions in parallel and consuming messages.

By design i would go for 8 consumers instances with 1 thread running as same consumer group. This way one consumer instance will be assigned to one partition. if your consumer instance die then you would have only one partition data lags behind not all.

Answer#2: In your consumer code you have thread pool of size 20 reading 8 partitions mean 12 threads will remain idle. Try 8 instances running in parallel with same group.id and share your result.

Edit: This is also helpful reading

http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 teu
Solution 2 ivivi
Solution 3