'Flink checkpoint not replaying the kafka events which were in process during the savepoint/checkpoint

I want to test end-to-end exactly once processing in flink. My job is:

Kafka-source -> mapper1 -> mapper-2 -> kafka-sink

I had put a Thread.sleep(100000) in mapper1 and then ran the job. I took the savepoint while stopping the job and then I removed the Thread.sleep(100000) form the mapper1, and I expect that the event should be replayed as it was not sinked. But that didnt happen and job is waiting for new event.

My Kafka source:

KafkaSource.<String>builder()
                .setBootstrapServers(consumerConfig.getBrokers())
                .setTopics(consumerConfig.getTopic())
                .setGroupId(consumerConfig.getGroupId())
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("commit.offsets.on.checkpoint", "true")
                .build();

My kafka sink:

KafkaSink.<String>builder()
                .setBootstrapServers(producerConfig.getBootstrapServers())
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(producerConfig.getTopic())
                        .setValueSerializationSchema(new SimpleStringSchema()).build())
                .build();

My environmentSetup for flink job:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.enableCheckpointing(2000);
        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
        environment.getCheckpointConfig().setCheckpointTimeout(60000);
        environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        environment.getCheckpointConfig().setCheckpointTimeout(1000);
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        environment.getCheckpointConfig().enableUnalignedCheckpoints();
        environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        environment.configure(configuration);

What am I doing wrong here? I want that any event which is in process during the cancellation/stop of the job, should restart again.

EDIT 1: I observed that my kafka was showing offset lag for my flink's kafka-source consumer group. I am assuming it means my checkpointing is behaving right, is that correct ?

I also observed when i restarted my job from checkpoint, it didnt start to consume from the remaining offsets, while I have the consumer offset set to EARLIEST. I had to send more events to trigger the consumption on kafka-source side and then it consumed all the events.



Solution 1:[1]

For exactly-once, you must provide a TransactionalIdPrefix unique across all applications running against the same Kafka cluster (this is a change compared to the legacy FlinkKafkaConsumer):

KafkaSink<T> sink =
        KafkaSink.<T>builder()
                .setBootstrapServers(...)
                .setKafkaProducerConfig(...)
                .setRecordSerializer(...)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("unique-id-for-your-app")
                .build();

When resuming from a checkpoint, Flink always uses the offsets stored in the checkpoint rather than those configured in the code or stored in the broker.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 David Anderson