'Retrieve always latest messages from Kafka on reconnection
I'm writing a piece of code that needs to read hundreds of messages from Kafka each few milliseconds. I'm using C++ and librdkafka. When my program stops and then restarts it does not need to recover all lost messages since it was stopped, but instead it needs to always read from latest messages sent.
As far as I know I can manage consumer offsets by playing with enable.auto.commit
and auto.offset.reset
. But, the latter one is only useful when there are no committed offsets while the former one instead let me manage myself the offsets to store.
Playing with these two values I found that if I set enable.auto.commit
to false
, without committing any offset, and auto.offset.reset
to latest
it seems to retrieve always the latest messages; but how clean is this solution?
My fear is that if between two consumer polls there are 2 messages sent than my consumer only takes the latest, or if no messages where sent it continually reads the same. Both are unwanted behaviour.
Another idea was to clear consumer group offsets or seeking forward, but the seek
method in librdkafka seems to not work as needed and I cannot find methods to manage consumer groups..
How can I always read latest messages from Kafka using librdkafka?
Solution 1:[1]
Finally I solved by managing myself the callback on rebalance. This callback will be always executed when a new consumer join or leave the group.
The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS.
So within the rebalance callback I iterate over the TopicPartition
s in order to assign them to the consumer, using the latest offsets. The snippet of code is this:
class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
for (auto partition = partitions.begin(); partition != partitions.end(); partition++) {
(*partition)->set_offset(RdKafka::Topic::OFFSET_END);
consumer->assign(partitions);
}
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
}
}
};
In order to use that callback I will set it to the consumer.
SeekEndRebalanceCb ex_rb_cb;
if (consumer->set("rebalance_cb", &ex_rb_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
return false;
}
Solution 2:[2]
The consumer->assign(partitions) should be invoked after the end of cycle for
class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
for (auto partition = partitions.begin(); partition != partitions.end(); partition++)
(*partition)->set_offset(RdKafka::Topic::OFFSET_END);
consumer->assign(partitions);
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
}
}
};
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | shade |