'Spark + Read kafka topic from a specific offset based on timestamp

How do I set a spark job to pick up a kafka topic from a specific offset based on a timestamp ? Let's say that I need to get all data from a kafka topic starting 6 hours ago.



Solution 1:[1]

Kafka does not work in that way. You are seeing Kafka like something you can query with another different parameter than offset, besides keep in mind that topic can have more than one partition so each one has a different one. Maybe you can use another relational storage to map offset/partition with timestamp, a little bit risky. Thinking in akka stream kafka consumer, for example, each of your request by timestamp should be send via another topic to activate your consumers(each of them with one ore more partitions assigned) and query for the specific offset, produce and merge. With Spark, you can adjust your consumer strategies for each job but the process should be the same.

Another thing is if your Kafka recovers it´s possible that you need to read the whole topic to update your pair (timestamp/offset). All of this can sound a little bit weird and maybe it should be better to store your topic in Cassandra (for example) and you can query it later.

Solution 2:[2]

The answers provided here seems to be dated. As with the latest API documentation for Spark 3.x.x given here, Structured Streaming Kafka Integration

There are quite few flexible ways in which the messages can be retrieved between a specified window from Kafka.

An example code for the batch api that get messages from all the partitions, that are between the window specified via startingTimestamp and endingTimestamp, which is in epoch time with millisecond precision.

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option("subscribe", topic)
  .option("startingTimestamp", 1650418006000)
  .option("endingTimestamp", 1650418008000)
  .load()

Solution 3:[3]

Kafka is an append-only log storage. You can start consuming from a particular offset in a partition given that you know the offset. Consumption is super fast, you can have a design where you start from the smallest offset and start doing some logic only once you have come across a message (which could probably have a timestamp field to check for).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Emiliano Martinez
Solution 2
Solution 3 void