'Accessing the current sliding window in foreachBatch in Spark Structured Streaming

I am using in Spark Structured Streaming foreachBatch() to maintain manually a sliding window, consisting of the last 200000 entries. With every microbatch I receive about 50 rows. On this sliding sliding window I am calculating manually my desired metrices like min, max, etc.

Spark provides also a Sliding Window function. But I have two problems with it:

  • The interval when the sliding window is updated can only be configured based on a time period, but there seems no possibility to force an update with each single microbatch coming in. Is there a possibility that I do not see?

  • The bigger problem: It seems I can only do aggregations using grouping like:

    val windowedCounts = words.groupBy(
      window($"timestamp", "10 minutes", "5 minutes"),
      $"word"
    ).count()

But I do not want to group over multiple sliding windows. I need something like the existing foreachBatch() that allows me to access not only the current batch but also/or the current sliding window. Is there something like that?

Thank you for your time!



Solution 1:[1]

You can probably use flatMapGroupsWithState feature to achieve this. Basically you can store/keep updating previous batches in an internal state(only the information you need) and use it in the next batch

You can refer below links

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#arbitrary-stateful-operations

https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-demo-arbitrary-stateful-streaming-aggregation-flatMapGroupsWithState.html

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Vindhya G