'only one client consumed all message in kafka consumer group

There is 3 (node.js + kafka-node) client

Following the tutorial, I made topic 'quickstart-events'.

Topic: quickstart-events        TopicId: oZC9g7FvTiOm-QIuPpyotQ PartitionCount: 3       ReplicationFactor: 3    Configs: 
        Topic: quickstart-events        Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: quickstart-events        Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: quickstart-events        Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

And each nodejs's app.ts is this.

const options = {
    kafkaHost: 'broker1:9092,broker2:9092,broker3:9092',
    groupId: 'qe',
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    commitOffsetsOnFirstJoin: true
} as ConsumerGroupOptions;

const consumerGroup = new ConsumerGroup(options, ['quickstart-events']);

consumerGroup.on("message", async message => {
    console.log(message.value as string)
});

Then, I produce message in Spring Boot(kotlin).

@Component
class Job : QuartzJobBean() {
    @Autowired
    private lateinit var kt: KafkaTemplate<String, Any>

    private val log = LoggerFactory.getLogger(this.javaClass)
    private val gson = Gson()

    override fun executeInternal(context: JobExecutionContext) {
        val metaData = gson.toJsonTree(context.jobDetail.jobDataMap).asJsonObject.toString()
        for (i in 0 until 3) {
            kt.send("quickstart-events", "${LocalDateTime.now()} : $metaData")
        }
    }
}

First Case.

  • First node client
2022-04-29T15:15:40.014 : hello, world!
2022-04-29T15:15:40.416 : hello, world!
2022-04-29T15:15:40.416 : hello, world!
  • Second node client
  • Third node client

Second Case.

  • First node client
2022-04-29T15:15:40.416 : hello, world!
2022-04-29T15:15:40.416 : hello, world!
  • Second node client
2022-04-29T15:15:40.014 : hello, world!
  • Third node client

Why 3 Consumers(node client) don't consume fairly like this?

  • First node client
2022-04-29T15:15:40.014 : hello, world!
  • Second node client
2022-04-29T15:15:40.014 : hello, world!
  • Third node client
2022-04-29T15:15:40.014 : hello, world!


Solution 1:[1]

I knew that should set autoCommit = false and manually commit offset.

const options = {
    kafkaHost: ...,
    groupId: ...,
    sessionTimeout: 15000,
    protocol: ['roundrobin', 'range'],
    fromOffset: "latest",
    commitOffsetsOnFirstJoin: true,
    autoCommit: false
} as ConsumerGroupOptions;

const consumerGroup = new ConsumerGroup(options, topics);

consumerGroup.on("message", async (message: Message) => {
    await doSomething();

    consumerGroup.commit((error, data) => {
        if (error) {
            console.error(error);
        }
        else {
            console.debug(data);
        }
    });
});

This is my code be used in docker image.

No matter how many I make, I can consume messages without any problems.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 01hanst