'Inconsistent results when joining multiple tables in Flink

We've 4 CDC sources defined of which we need to combine the data into one result table. We're creating a table for each source using the SQL API, eg:

"CREATE TABLE IF NOT EXISTS PAA31 (\n" +
"    WRK_SDL_DEF_NO     STRING,\n" +
"    HTR_FROM_DT        BIGINT,\n" +
...
"    update_time        TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
"    PRIMARY KEY (WRK_SDL_DEF_NO) NOT ENFORCED,\n" +
"    WATERMARK FOR update_time AS update_time\n" +
") WITH ('value.format' = 'debezium-json' ... )";

After we've defined each table, we create a new table by running the following query:

"SELECT PAA30.WRK_SDL_DEF_NO as id,\n" +
"       PAA33.DSB_TX as description,\n" +
...
"FROM PAA30\n" +
"INNER JOIN PAA33 ON PAA30.WRK_SDL_DEF_NO = PAA33.WRK_SDL_DEF_NO AND PAA33.LGG_CD = 'NL' \n" +
"INNER JOIN PAA31 ON PAA30.WRK_SDL_DEF_NO = PAA31.WRK_SDL_DEF_NO \n" +
"INNER JOIN PAA32 ON PAA30.WRK_SDL_DEF_NO = PAA32.WRK_SDL_DEF_NO";

Note some rows have been left out for formatting reasons.

The issue we're running into is that executing this exact job results in inconsistent outcomes where sometimes we have 1750 resulting rows (correct), however most of the times the resulting rows is less and random.

This is the plan overview for the job in Flink. The amount of records sent from the sources are all correct, however the amount of records sent of the 1st join statement is not:

Flink Job Execution Plan and numbers
Flink Job Execution Plan and numbers

What could be the cause and how can we have consistent joining of all data sources?



Solution 1:[1]

We've been able to get consistent results, even for bigger datasets, by enabling MiniBatch Aggregation

configuration.setString("table.exec.mini-batch.enabled", "true"); 
configuration.setString("table.exec.mini-batch.allow-latency", "500 ms"); 
configuration.setString("table.exec.mini-batch.size", "5000");

This seems to fix the consistency issue for both the local filesystem connector as well as for the Flink Pulsar connector.

From these findings, it seems Flink was having issues with the overhead of state management for our throughput. We'll still need to assess realistic CDC initial load processing, but so far enabling MiniBatch Aggregation seems promising

Thanks @david-anderson for thinking with us and trying to figure this out.

Solution 2:[2]

I see that your pipeline includes an event time window with a processing time trigger, and does watermarking with zero tolerance for out-of-order events. These could be causing problems.

Flink can only produce completely correct, deterministic results for streaming workloads that involve event time logic if there are no late events. Late events can occur whenever processing time logic interferes with the watermarking, e.g.,

  • if the watermark generator is incorrectly configured, and doesn't account for the actual out-of-orderness
  • if idleness detection is used, and an inactive stream becomes re-activated
  • after a restart (or recovery, or rescaling) occurs

Just guessing, however. Would need to see more details to give a more informed answer. A minimal, reproducible example would be ideal.

Update:

It's also the case the streaming jobs won't emit their last set of results unless something is done to provoke them to do so. In this case you could, for example, use

./bin/flink stop $JOB_ID --drain --savepointPath /tmp/flink-savepoints

to force a large watermark to be emitted that will close the last window.

Update 2:

Regular joins don't produce results with time attributes or watermarks. This is because it's impossible to guarantee that the results will be emitted in any particular order, so meaningful watermarking isn't possible. Normally it's not possible to apply event time windowing after such a join.

Update 3:

Having now studied the latest code, this obviously doesn't have anything to do with Watermarks.

If I understand correctly, the issue is that while the results always include what should be produced, there are varying numbers of additional output records. I can suggest two possible causes:

(1) When Flink is used with Debezium server there's the possibility of duplicate events. I don't think this is the explanation, but it is something to be aware of.

(2) The result of the join is non-deterministic (it varies from run to run). This is happening because the various input streams are racing against each other, and the exact order in which related events from different streams are ingested is affecting how the results are produced.

The result of the join is a changelog stream. I suspect that when the results are perfect, no retractions occurred, while in the other cases some preliminary results are produced that are later updated.

If you examine the ROW_KIND information in the output stream you should be able to confirm if this guess is correct.

I'm not very familiar with the pulsar connector, but I'm guessing you should be using the upsert_pulsar sink.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 tdebaets
Solution 2