'How to get kafka offset data, specified on timestamp
I've tried to get the offset from Kafka topic based on timestamp when I tried to run it was throwing null pointer error,
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : partitions) {
timestampsToSearch.put(partition, startTimestamp);
}
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
for (TopicPartition partition : partitions) {
Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);
Any help will be appreciated.
Solution 1:[1]
To find the offsets that correspond to a timestamp, you need to use the offsetsForTimes()
method.
For example, this will print the offsets for partition 0 of mytopic
that correspond to 1 second ago:
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
System.err.println(offsets);
}
That will display something like:
{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}
Solution 2:[2]
You can use Admin.listOffsets
with OffsetSpec.forTimestamp
:
Map<TopicPartition, OffsetSpec> topicOffsetSpecs = new HashMap<>();
TopicPartition topicPartition = new TopicPartition("topic1", 0);
OffsetSpec offsetSpec = OffsetSpec.forTimestamp(timestamp);
topicOffsetSpecs.put(topicPartition, offsetSpec);
admin.listOffsets(topicOffsetSpecs).all().get(); // Info for given timestamp
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Mickael Maison |
Solution 2 |