'Retrieve Timestamp based data from Kafka
How can I get messages or data from the Kafka cluster for a specified day. For example 13 September, can anyone provide me code for this. I have googled it and found only theory but I want the code
Solution 1:[1]
There is no access method for this. Also, before Kafka v0.10
messages do not contain any timestamp information, thus, it is impossible to know when a message was written into a topic.
As of Kafka v0.10
each message contains a meta data timestamp attribute, that is either set by the producer on message creation time, or by the broker on message insertion time. A time-based index is planned, but not available yet. Thus, you need to consume the whole topic and check the timestamp field (and ignore all messaged you are not interested in). To find the beginning, you could also do a binary search with regard to offsets and timestamps to find the first message faster.
Update:
Kakfa 0.10.1
adds a time-based index. It allows to seek
to the first record with a timestamp equals or larger of the given timestamp. You can use it via KafkaConsumer#offsetsForTime()
. This will return the corresponding offsets and you can feed them into KafkaConsumer#seek()
. You can just consume the data and check the records timestamp field via ConsumerRecord#timestamp()
to see when you can stop processing.
Note, that data is strictly ordered by offsets but not by timestamp. Thus, during processing, you might get "late" records with smaller timestamp than your start timestamp (you could simple skip over those records though).
A more difficult problem is late arriving record at the end of your search interval though. After you got the first timestamp with a larger timestamp than your search interval, there might still be records with timestamp that are part of your search interval later on (if those records did got appended to the topic "late"). There is no way to know that though. Thus, you might want to keep reading "some more" records and check if there are "late" records. How much "some records" means, is a design decision you need to make by yourself.
There is not general guideline though -- if you have additional knowledge about your "write pattern" it can help to define a good strategy to how many records you want to consumer after your search interval "ended". Of course there are two default strategies: (1) stop at the very first record with larger timestamp than you search interval (and effectively ignore any late arriving records -- if you use "log append time" configuration this is of course a safe strategy); (2) you read to the end of the log -- this is the safest strategy with regard to completeness but might result in prohibitive overhead (also note, as record can be appended any time and if record "delay" could be arbitrary large, a late record might even be append after you reach end-of-log).
In practice, it might be a good idea to think about a "max expected delay" and read until you get a record with larger timestamp than this upper delay bound.
Solution 2:[2]
Add this to the current command --property print.timestamp=true That will print the timestamp CreateTime:1609917197764.
Example: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --property print.timestamp=true --from-beginning
Solution 3:[3]
getting data for a specific day from kafka is NOT efficient, as the data is stored linearly inside kafka on each broker's storage system. therefore, even if you have timestamp inside each of your message or use kafka's message metadata which could contain timestamp in the later kafka message version(>=0.10), you still have to scan the entire topic on each partition to get the data. due to the fact that the data inside kafka is not indexed by date but only offset.
remember, kafka is a queue, NOT a database. if you want this date based retrieve pattern, you might want to consider storing kafka message inside another suitable databases system and use timestamp as your index.
Solution 4:[4]
I am new to Kafka and solution looks hacky as to me, but I would like to add at least any solution for this question:
In my case I use kafka-python==2.0.2
This code reads all messages starting from April 5, 2022 But you can find 'till offset' in the same fashion.
from kafka import KafkaConsumer, TopicPartition
TOPIC = 'test'
FROM_TIMESTAMP = 1649152610480 # April 5, 2022
consumer = KafkaConsumer(TOPIC)
# seek for each partition offset based on timestamp
for p in consumer.partitions_for_topic(TOPIC):
start_offset = consumer.beginning_offsets([TopicPartition(TOPIC, p)])[TopicPartition(TOPIC, p)]
end_offset = consumer.end_offsets([TopicPartition(TOPIC, p)])[TopicPartition(TOPIC, p)]
for_time = consumer.offsets_for_times({TopicPartition(TOPIC, p): FROM_TIMESTAMP})
offset_position = for_time[TopicPartition(TOPIC, p)]
offset = end_offset
if offset_position:
offset = offset_position.offset
consumer.seek(TopicPartition(TOPIC, p), offset)
for msg in consumer:
print(msg)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | chandrasekar |
Solution 3 | linehrr |
Solution 4 |