'Apache Flink Stream Event Delay

I tried to develop the following code but it doesn't work. I would like to use apache Flink to delay the event that have the time (specified in the timestamp field) different from the current date.

Sample:

  • Current Date: 2022-05-06 10:30

  • Event 1[{ "user1": "1", "user2": "2", "timestamp": "2022-05-06 10:30" } --> OK (SINK)

  • Event 2[{ "user1": "1", "user2": "2", "timestamp": "2022-05-06 13:30" } --> DELAY (will be sent at 13:30)

FlinkLikePipeline

public class FlinkLikePipeline {

public static void createBackup() throws Exception {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String kafkaAddress = "localhost:9092";

    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    environment.getConfig().setAutoWatermarkInterval(Duration.ofMillis(1).toMillis());

    FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
    consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());

    FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
            new BackupSerializationSchema());

    DataStream<Like> likes = environment.addSource(consumer);

    DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
    matches.addSink(producer);

    environment.execute("Keyed Process Function Example");

}

public static void main(String[] args) throws Exception {
    createBackup();
}

}

InputMessageTimestampAssigner

public class InputMessageTimestampAssigner implements 
AssignerWithPunctuatedWatermarks<Like> {

@Override
public long extractTimestamp(Like element, long previousElementTimestamp) {
    Timestamp now = new Timestamp(System.currentTimeMillis());
    return now.getTime();

}

@Override
public Watermark checkAndGetNextWatermark(Like lastElement, long extractedTimestamp) {
    return new Watermark(extractedTimestamp);
}

}

MatchFunction

public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {

ValueState<Like> liked;

@Override
public void open(Configuration parameters) throws Exception {
    liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
}

@Override
public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {

    Timestamp now = new Timestamp(System.currentTimeMillis());
    Timestamp sendAt = newLike.getTimestamp();

    if (sendAt.before(now)) {
        liked.clear();
        out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
    } else {
        // schedule the next timer by sendAt
        liked.update(newLike);
        ctx.timerService().registerEventTimeTimer(sendAt.getTime());
    }
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
    Like like = liked.value();
    Timestamp now = new Timestamp(System.currentTimeMillis());
    Timestamp sendAt = like.getTimestamp();
    if (now == sendAt) {
        liked.clear();
        out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
    }

}

}

Producer Topic Kafka (Json Sample): { "user1": "1", "user2": "2", "timestamp": "2022-05-06T11:32:00.000Z" }

Thank you for your support, Giuseppe.



Solution 1:[1]

Try this:

public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {

ValueState<Like> liked;

@Override
public void open(Configuration parameters) throws Exception {
    liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
}

@Override
public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {

    Timestamp now = new Timestamp(ctx.timestamp());  // CHANGE: Use context timestamp
    Timestamp sendAt = newLike.getTimestamp();

    if (sendAt.before(now)) {
        liked.clear();
        out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
    } else {
        // schedule the next timer by sendAt
        liked.update(newLike);
        ctx.timerService().registerProcessingTimeTimer(sendAt.getTime()); // CHANGED: change to processing time
    }
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
    Like like = liked.value();
    Timestamp now = new Timestamp(timestamp);  // CHANGE: Use current timestamp
    Timestamp sendAt = like.getTimestamp();
    if (now >= sendAt) { // CHANGE: Should try equals or great
        liked.clear();
        out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
    }

}

and

public static void createBackup() throws Exception {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String kafkaAddress = "localhost:9092";

    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);  // CHANGED: to processingtime
    environment.getConfig().setAutoWatermarkInterval(Duration.ofMillis(1).toMillis());

    FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
    consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());

    FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
            new BackupSerializationSchema());

    DataStream<Like> likes = environment.addSource(consumer);

    DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
    matches.addSink(producer);

    environment.execute("Keyed Process Function Example");

}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1