'Apache Flink Stream Event Delay
I tried to develop the following code but it doesn't work. I would like to use apache Flink to delay the event that have the time (specified in the timestamp field) different from the current date.
Current Date: 2022-05-06 10:30
Event 1[{ "user1": "1", "user2": "2", "timestamp": "2022-05-06 10:30" } --> OK (SINK)
Event 2[{ "user1": "1", "user2": "2", "timestamp": "2022-05-06 13:30" } --> DELAY (will be sent at 13:30)
public class FlinkLikePipeline {
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
new BackupSerializationSchema());
DataStream<Like> likes = environment.addSource(consumer);
DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
environment.execute("Keyed Process Function Example");
public static void main(String[] args) throws Exception {
public class InputMessageTimestampAssigner implements
AssignerWithPunctuatedWatermarks<Like> {
public long extractTimestamp(Like element, long previousElementTimestamp) {
Timestamp now = new Timestamp(System.currentTimeMillis());
return now.getTime();
public Watermark checkAndGetNextWatermark(Like lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {
ValueState<Like> liked;
public void open(Configuration parameters) throws Exception {
liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {
Timestamp now = new Timestamp(System.currentTimeMillis());
Timestamp sendAt = newLike.getTimestamp();
if (sendAt.before(now)) {
out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
} else {
// schedule the next timer by sendAt
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
Like like = liked.value();
Timestamp now = new Timestamp(System.currentTimeMillis());
Timestamp sendAt = like.getTimestamp();
if (now == sendAt) {
out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
Producer Topic Kafka (Json Sample): { "user1": "1", "user2": "2", "timestamp": "2022-05-06T11:32:00.000Z" }
Thank you for your support, Giuseppe.
Solution 1:[1]
Try this:
public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {
ValueState<Like> liked;
public void open(Configuration parameters) throws Exception {
liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {
Timestamp now = new Timestamp(ctx.timestamp()); // CHANGE: Use context timestamp
Timestamp sendAt = newLike.getTimestamp();
if (sendAt.before(now)) {
out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
} else {
// schedule the next timer by sendAt
ctx.timerService().registerProcessingTimeTimer(sendAt.getTime()); // CHANGED: change to processing time
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
Like like = liked.value();
Timestamp now = new Timestamp(timestamp); // CHANGE: Use current timestamp
Timestamp sendAt = like.getTimestamp();
if (now >= sendAt) { // CHANGE: Should try equals or great
out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // CHANGED: to processingtime
FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
new BackupSerializationSchema());
DataStream<Like> likes = environment.addSource(consumer);
DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
environment.execute("Keyed Process Function Example");
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
Solution 1 |