'Is there a way to manually store Kafka offset so a consumer never misses messages?

Using PHP Laravel Framework to consume kafka messages with the help of the mateusjunges/laravel-kafka laravel package.

Is it possible to save the offset by consumer in, for example, Redis or DB? And, when the broker shuts down and comes back up, is it possible to tell the consumer to start consuming messages from that specific offset?

Let's say I have a laravel Artisan command that builds the following consumer :

public function handle()
{
    $topics = [
        'fake-topic-1',
        'fake-topic-2',
        'fake-topic-3'
    ];

    $cachedRegistry = new CachedRegistry(
        new BlockingRegistry(
            new PromisingRegistry(
                new Client(['base_uri' => 'https://fake-schema-registry.com'])
            )
        ),
        new AvroObjectCacheAdapter()
    );        

    $registry = new \Junges\Kafka\Message\Registry\AvroSchemaRegistry($cachedRegistry);
    $recordSerializer = new RecordSerializer($cachedRegistry);

    foreach ($topics as $topic) 
    {
        $registry->addKeySchemaMappingForTopic(
            $topic,
            new \Junges\Kafka\Message\KafkaAvroSchema($topic . '-key')
        );
        $registry->addBodySchemaMappingForTopic(
            $topic,
            new \Junges\Kafka\Message\KafkaAvroSchema($topic . '-value')
        );
    }

    $deserializer = new \Junges\Kafka\Message\Deserializers\AvroDeserializer($registry, $recordSerializer);

    $consumer = \Junges\Kafka\Facades\Kafka::createConsumer(
        $topics, 'fake-test-group', 'fake-broker.com:9999')
    ->withOptions([
        'security.protocol' => 'SSL',
        'ssl.ca.location' => storage_path() . '/client.keystore.crt',
        'ssl.keystore.location' => storage_path() . '/client.keystore.p12',
        'ssl.keystore.password' => 'fakePassword',
        'ssl.key.password' => 'fakePassword',
    ])
    ->withAutoCommit()
    ->usingDeserializer($deserializer)
    ->withHandler(function(\Junges\Kafka\Contracts\KafkaConsumerMessage $message) {

        KafkaMessagesJob::dispatch($message)->onQueue('kafka_messages_queue');

    }) 
    ->build();    
    
    $consumer->consume();
}

My problem now is that, from time to time, the "fake-broker.com:9999" shuts down and when it comes up again, it misses a few messages...

  • offset_reset is set to latest ;
  • The option auto.commit.interval.ms is not set on the ->withOptions() method, so it is using the default value (5 seconds, I believe) ;
  • auto_commit is set to true and the consumer is built with the option ->withAutoCommit() as well ;

Let me know if you guys need any additional information ;) Thank you in advance.

EDIT: According to this thread here , I should set my "offset_reset" to "earliest", and not "latest". Even tho, I'm almost 100% sure that an offset is committed (somehow, somewhere stored), because I am using the same consumer group ID in the same partition (0), so, the "offset_reset" is not even taken into consideration, I'm assuming...



Solution 1:[1]

somehow, somewhere stored

Kafka consumer groups store offsets in Kafka (__consumer_offsets topic). So, therefore, storing externally doesn't really make sense because you need Kafka to be up, regardless.

Only when there is no consumer group does that offset_reset value matter. The consumer client isn't (shouldn't? I don't know the PHP client code.) "caching" the offsets locally, and broker restarts should preserve any committed values. If you want to guarantee you are able to commit every message, you need to disable auto-commits and handle it yourself.

You can optionally inspect the message in your handler function, and store that message offset somewhere else, but then you are fully responsible for seeking the consumer when it starts back up (again, you want to disable all commit functionality in the consumer, and also set auto.offset.reset consumer config to none rather that latest/earliest).

Solution 2:[2]

@OneCricketeer thank you for the answer and explanation. You made yourself pretty clear and I understand better now how this works, although, setting my auto.offset.reset to none triggers an error saying: "Invalid value "none" for configuration property "auto.offset.reset"" .

I will probably not do this, but, let's say I'm store every offset from every message I read, I disable all commit functionality in the consumer and somehow put the auto.offset.reset to none, as you said.

How can I tell the consumer to start consuming messages from X offset, where X is the offset I stored from the last message I read?

Solution 3:[3]

It is possible using MySQL database and Debezium connector.

Debezium reads the database binlog, in the scenario your Kafka server shuts down, once it goes up the Debezium connector will start from the last know section from the binlog.

It is necessary setup some parameters in my.cnf to make sure database changes are properly in order in which they are committed.

https://debezium.io/documentation/reference/stable/connectors/mysql.html

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 OneCricketeer
Solution 2 Adriano Ramalho
Solution 3 Luis Estrada