'Flink Missing Events With Windowed Processor(Event Time Windows) and Kafka Source

We have a Streaming Job that has 20 separate pipelines, with each pipeline having one/many Kafka topic sources and with some pipelines having Windowed Processor and others being a Non-Windowed Processor.

We are noticing data loss for Windowed Processor pipelines when the job goes down and takes some time to recover/when the job needs to be restarted.

  • I have set UID for all of the Operators and I can see in logs that offsets are being restored from savepoint for the Kafka consumer operator

  • we are using BoundedOutOfOrdernessTimestampExtractor to Assign watermarks based on event time.

public class KafkaEventTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Event> implements Serializable{

    public KafkaEventTimestampExtractor(Time maxOutOfOrderness) {
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Event element) {
        try {
            log.info("event to be processed, event:{}", new ObjectMapper().writeValueAsString(element));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        Long ts = null;
        ts = Double.valueOf(Double.parseDouble(element.getTs())).longValue();
        ts = ts.toString().length() < 13 ? ts * 1000 : ts;
        return ts;
    }
}

Pipeline Config looks something like this.

  • NON-WINDOWED
SourceUtil
  .getEventDataStream(env, kafkaSourceSet)
  .process(new S3EventProcessor()).uid(“…..**)
  .addSink();
  • WINDOWED
SourceUtil
  .getEventDataStream(env, kafkaSourceSet)
  .assignTimestampsAndWatermarks(
    new KafkaEventTimestampExtractor(Time.seconds(4)))
  .windowAll(TumblingEventTimeWindows.of(
    Time.milliseconds(kafkaSourceSet.bufferWindowSize))
  .process(new S3EventProcessor()).uid(“…..**)
  .addSink();
  • Lets say job is down 30 min, in that case pipeline where we do not use window processor does not miss any data but paritial data is missed from the windowed processor for those 30 min.

  • when we increase the out-of-order events delay in TimeWinows, ie- we increased It to 30min from 4sec, then the events are not getting missed if the application is up within 30min.we are getting nowhere near the solution since the delay of more than 1 min is infeasible for us also there will be too many live windows which will mean huge infra change for us.



Solution 1:[1]

The only scenario I can imagine that might explain this is if the event timestamps are affected by the outage. Then a 30-minute outage would cause a 30-minute gap in the timestamps, and with out-or-order ingestion, a 4-second bounded-out-of-orderness strategy will yield some late events that will be dropped by the window.

Solution 2:[2]

This was happening due to a mistake in my pipeline, instead of using the timestamp Assigner at flinkKafkaConsumer, it was added to the data stream generated from flinkKafkaConsumer.

This change has fixed the issue at my end for automatic recovery but in case of a manual restart post any changes to the pipeline, some data is still being missed for the last window when the job had stopped.

Note:-- we are using checkpoint for manual recovery. As per docs, Checkpoints are ideal for automatic recovery in case of job failures.

Any note on this would help, if we need to create a savepoint in case we need to make some changes to the pipeline and restart it manually or we can make a complete recovery with the checkpoint.

Our Only concern in case of using savepoint is the reprocessing of same events that might happen, which is not ideal for us in few cases.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2