'Apache Flink Stream Event Delay
I tried to develop the following code but it doesn't work. I would like to use apache Flink to delay the event that have the time (specified in the timestamp field) different from the current date.
Sample:
Current Date: 2022-05-06 10:30
Event 1[{ "user1": "1", "user2": "2", "timestamp": "2022-05-06 10:30" } --> OK (SINK)
Event 2[{ "user1": "1", "user2": "2", "timestamp": "2022-05-06 13:30" } --> DELAY (will be sent at 13:30)
FlinkLikePipeline
public class FlinkLikePipeline {
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
environment.getConfig().setAutoWatermarkInterval(Duration.ofMillis(1).toMillis());
FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
new BackupSerializationSchema());
DataStream<Like> likes = environment.addSource(consumer);
DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
matches.addSink(producer);
environment.execute("Keyed Process Function Example");
}
public static void main(String[] args) throws Exception {
createBackup();
}
}
InputMessageTimestampAssigner
public class InputMessageTimestampAssigner implements
AssignerWithPunctuatedWatermarks<Like> {
@Override
public long extractTimestamp(Like element, long previousElementTimestamp) {
Timestamp now = new Timestamp(System.currentTimeMillis());
return now.getTime();
}
@Override
public Watermark checkAndGetNextWatermark(Like lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
}
MatchFunction
public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {
ValueState<Like> liked;
@Override
public void open(Configuration parameters) throws Exception {
liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
}
@Override
public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {
Timestamp now = new Timestamp(System.currentTimeMillis());
Timestamp sendAt = newLike.getTimestamp();
if (sendAt.before(now)) {
liked.clear();
out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
} else {
// schedule the next timer by sendAt
liked.update(newLike);
ctx.timerService().registerEventTimeTimer(sendAt.getTime());
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
Like like = liked.value();
Timestamp now = new Timestamp(System.currentTimeMillis());
Timestamp sendAt = like.getTimestamp();
if (now == sendAt) {
liked.clear();
out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
}
}
}
Producer Topic Kafka (Json Sample): { "user1": "1", "user2": "2", "timestamp": "2022-05-06T11:32:00.000Z" }
Thank you for your support, Giuseppe.
Solution 1:[1]
Try this:
public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {
ValueState<Like> liked;
@Override
public void open(Configuration parameters) throws Exception {
liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
}
@Override
public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {
Timestamp now = new Timestamp(ctx.timestamp()); // CHANGE: Use context timestamp
Timestamp sendAt = newLike.getTimestamp();
if (sendAt.before(now)) {
liked.clear();
out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
} else {
// schedule the next timer by sendAt
liked.update(newLike);
ctx.timerService().registerProcessingTimeTimer(sendAt.getTime()); // CHANGED: change to processing time
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
Like like = liked.value();
Timestamp now = new Timestamp(timestamp); // CHANGE: Use current timestamp
Timestamp sendAt = like.getTimestamp();
if (now >= sendAt) { // CHANGE: Should try equals or great
liked.clear();
out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
}
}
and
public static void createBackup() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // CHANGED: to processingtime
environment.getConfig().setAutoWatermarkInterval(Duration.ofMillis(1).toMillis());
FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
new BackupSerializationSchema());
DataStream<Like> likes = environment.addSource(consumer);
DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
matches.addSink(producer);
environment.execute("Keyed Process Function Example");
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |