'only one client consumed all message in kafka consumer group
There is 3 (node.js + kafka-node) client
Following the tutorial, I made topic 'quickstart-events'.
Topic: quickstart-events TopicId: oZC9g7FvTiOm-QIuPpyotQ PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: quickstart-events Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: quickstart-events Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: quickstart-events Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
And each nodejs's app.ts
is this.
const options = {
kafkaHost: 'broker1:9092,broker2:9092,broker3:9092',
groupId: 'qe',
sessionTimeout: 15000,
protocol: ['roundrobin'],
commitOffsetsOnFirstJoin: true
} as ConsumerGroupOptions;
const consumerGroup = new ConsumerGroup(options, ['quickstart-events']);
consumerGroup.on("message", async message => {
console.log(message.value as string)
});
Then, I produce message in Spring Boot(kotlin).
@Component
class Job : QuartzJobBean() {
@Autowired
private lateinit var kt: KafkaTemplate<String, Any>
private val log = LoggerFactory.getLogger(this.javaClass)
private val gson = Gson()
override fun executeInternal(context: JobExecutionContext) {
val metaData = gson.toJsonTree(context.jobDetail.jobDataMap).asJsonObject.toString()
for (i in 0 until 3) {
kt.send("quickstart-events", "${LocalDateTime.now()} : $metaData")
}
}
}
First Case.
- First node client
2022-04-29T15:15:40.014 : hello, world!
2022-04-29T15:15:40.416 : hello, world!
2022-04-29T15:15:40.416 : hello, world!
- Second node client
- Third node client
Second Case.
- First node client
2022-04-29T15:15:40.416 : hello, world!
2022-04-29T15:15:40.416 : hello, world!
- Second node client
2022-04-29T15:15:40.014 : hello, world!
- Third node client
Why 3 Consumers(node client) don't consume fairly like this?
- First node client
2022-04-29T15:15:40.014 : hello, world!
- Second node client
2022-04-29T15:15:40.014 : hello, world!
- Third node client
2022-04-29T15:15:40.014 : hello, world!
Solution 1:[1]
I knew that should set autoCommit = false
and manually commit offset.
const options = {
kafkaHost: ...,
groupId: ...,
sessionTimeout: 15000,
protocol: ['roundrobin', 'range'],
fromOffset: "latest",
commitOffsetsOnFirstJoin: true,
autoCommit: false
} as ConsumerGroupOptions;
const consumerGroup = new ConsumerGroup(options, topics);
consumerGroup.on("message", async (message: Message) => {
await doSomething();
consumerGroup.commit((error, data) => {
if (error) {
console.error(error);
}
else {
console.debug(data);
}
});
});
This is my code be used in docker image.
No matter how many I make, I can consume messages without any problems.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | 01hanst |