'How to take only first instance of data in Spark Structured Streaming according to multiple fields?

I am working on a Spark Structure Streaming pipeline that reads in 20,000 rows of data every five minutes or so. The vast majority of this data is 'duplicate', defined by having identical 'create_time', 'id', and 'property' values (the many other columns may have varying values). If any one of these values changes I want to pass along the record and then begin detecting duplicates for that new combination, meaning the pipeline will only end up actually keeping maybe 10 to 20 rows that come in during that five minute period.

Unfortunately I have no control over the data source, and very limited input in how the Spark cluster itself is managed. This pipeline used to live on Nifi, where I could use a distributed cache to store a hash of 'create_time' + 'id' + 'property', and every incoming record was compared against this cache and all duplicates were dropped. If a new combination was found it was added to the cache, and repeat. Any ideas on how I could achieve a similar result in Spark?

I've been banging my head against the wall using the dropDuplicates API but whenever I think I'm getting close it destroys my Java heap, as the duration of one 'create_time' + 'id' + 'property' data grouping could in theory last for hours. Here is where I'm at as of now, trying to make it work for even a one minute period:

df \
    .withWatermark("spark_ingest_time", "1 minute") \ one minute is just a test
    .dropDuplicates(["create_time", "id", "property"])

I fear I'm going down the wrong path with this, and that the dropDuplicates API was not intended for my usecase. But maybe I'm thinking about the watermark incorrectly? Is there a way to use dropDuplicates and watermarking to store my duplicate values in state, or should I look into creating some sort of dynamic lookup table that is limited in size?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source