'How to drain the window after a Flink join using coGroup()?

I'd like to join data coming in from two Kafka topics ("left" and "right").

Matching records are to be joined using an ID, but if a "left" or a "right" record is missing, the other one should be passed downstream after a certain timeout. Therefore I have chosen to use the coGroup function.

This works, but there is one problem: If there is no message at all, there is always at least one record which stays in an internal buffer for good. It gets pushed out when new messages arrive. Otherwise it is stuck.

The expected behaviour is that all records should be pushed out after the configured idle timeout has been reached.

Some information which might be relevant

  • Flink 1.14.4
  • The Flink parallelism is set to 8, so is the number of partitions in both Kafka topics.
  • Flink checkpointing is enabled
  • Event-time processing is to be used
  • Lombok is used: So val is like final var

Some code snippets:

Relevant join settings

public static final int AUTO_WATERMARK_INTERVAL_MS = 500;

public static final Duration SOURCE_MAX_OUT_OF_ORDERNESS = Duration.ofMillis(4000);
public static final Duration SOURCE_IDLE_TIMEOUT = Duration.ofMillis(1000);

public static final Duration TRANSFORMATION_MAX_OUT_OF_ORDERNESS = Duration.ofMillis(5000);
public static final Duration TRANSFORMATION_IDLE_TIMEOUT = Duration.ofMillis(1000);

public static final Time JOIN_WINDOW_SIZE = Time.milliseconds(1500);

Create KafkaSource

private static KafkaSource<JoinRecord> createKafkaSource(Config config, String topic) {
    val properties = KafkaConfigUtils.createConsumerConfig(config);

    val deserializationSchema = new KafkaRecordDeserializationSchema<JoinRecord>() {
        @Override
        public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<JoinRecord> out) {
            val m = JsonUtils.deserialize(record.value(), JoinRecord.class);

            val copy = m.toBuilder()
                    .partition(record.partition())
                    .build();

            out.collect(copy);
        }

        @Override
        public TypeInformation<JoinRecord> getProducedType() {
            return TypeInformation.of(JoinRecord.class);
        }
    };

    return KafkaSource.<JoinRecord>builder()
            .setProperties(properties)
            .setBootstrapServers(config.kafkaBootstrapServers)
            .setTopics(topic)
            .setGroupId(config.kafkaInputGroupIdPrefix + "-" + String.join("_", topic))
            .setDeserializer(deserializationSchema)
            .setStartingOffsets(OffsetsInitializer.latest())
            .build();
}

Create DataStreamSource

Then the DataStreamSource is built on top of the KafkaSource:

  • Configure "max out of orderness"
  • Configure "idleness"
  • Extract timestamp from record, to be used for event time processing
private static DataStreamSource<JoinRecord> createLeftSource(Config config,
                                                             StreamExecutionEnvironment env) {
    val leftKafkaSource = createLeftKafkaSource(config);

    val leftWms = WatermarkStrategy
            .<JoinRecord>forBoundedOutOfOrderness(SOURCE_MAX_OUT_OF_ORDERNESS)
            .withIdleness(SOURCE_IDLE_TIMEOUT)
            .withTimestampAssigner((joinRecord, __) -> joinRecord.timestamp.toEpochSecond() * 1000L);

    return env.fromSource(leftKafkaSource, leftWms, "left-kafka-source");
}

Use keyBy

The keyed sources are created on top of the DataSource instances like this:

  • Again configure "out of orderness" and "idleness"

  • Again extract timestamp

     val leftWms = WatermarkStrategy
             .<JoinRecord>forBoundedOutOfOrderness(TRANSFORMATION_MAX_OUT_OF_ORDERNESS)
             .withIdleness(TRANSFORMATION_IDLE_TIMEOUT)
             .withTimestampAssigner((joinRecord, __) -> {
                 if (VERBOSE_JOIN)
                     log.info("Left : " + joinRecord);
                 return joinRecord.timestamp.toEpochSecond() * 1000L;
             });
    
     val leftKeyedSource = leftSource
             .keyBy(jr -> jr.id)
             .assignTimestampsAndWatermarks(leftWms)
             .name("left-keyed-source");
    

Join using coGroup

The join then combines the left and the right keyed sources

    val joinedStream = leftKeyedSource
            .coGroup(rightKeyedSource)
            .where(left -> left.id)
            .equalTo(right -> right.id)
            .window(TumblingEventTimeWindows.of(JOIN_WINDOW_SIZE))
            .apply(new CoGroupFunction<JoinRecord, JoinRecord, JoinRecord>() {
                       @Override
                       public void coGroup(Iterable<JoinRecord> leftRecords, 
                                           Iterable<JoinRecord> rightRecords,
                                           Collector<JoinRecord> out) {
                           // Transform
                           val result = ...;

                           out.collect(result);
                       }

Write stream to console

The resulting joinedStream is written to the console:

    val consoleSink = new PrintSinkFunction<JoinRecord>();
    joinedStream.addSink(consoleSink);
  • How can I configure this join operation, so that all records are pushed downstream after the configured idle timeout?
  • If it can't be done this way: Is there another option?


Solution 1:[1]

This is the expected behavior. withIdleness doesn't try to handle the case where all streams are idle. It only helps in cases where there are still events flowing from at least one source partition/shard/split.

To get the behavior you desire (in the context of a continuous streaming job), you'll have to implement a custom watermark strategy that advances the watermark based on a processing time timer. Here's an implementation that uses the legacy watermark API.

On the other hand, if the job is complete and you just want to drain the final results before shutting it down, you can use the --drain option when you stop the job. Or if you use bounded sources this will happen automatically.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1