'Azure Stream Analytics current day aggregation

I'm quite new in Azure Stream Analytics but I need to push to Power BI (live dashboard) rolling totals from start of the day every time when new event arrives to Azure Stream Analytics job. I've created next SQL query to calculate this

SELECT
    Factory_Id,
    COUNT(0) as events_count, 
    MAX(event_create_time) as last_event_time,
    SUM(event_value) as event_value_total
INTO
        [powerbi]
FROM
    [eventhub] TIMESTAMP BY event_create_time
WHERE DAY(event_create_time) = DAY(System.Timestamp) and MONTH(event_create_time) = MONTH(System.Timestamp) and YEAR(event_create_time) = YEAR(System.Timestamp)
    GROUP BY Factory_Id, SlidingWindow(day,1)

But this didn't give me desired result - I get total for last 24 hours(not only for current day) and some times record with bigger last_event_time has events_count smaller then record with smaller last_event_time. The question is - What I'm doing wrong and how can I achieve desired outcome?



Solution 1:[1]

EDIT following comment: This computes the results for the last 24h, but what's needed is the running sum/count to day (from 00:00 until now). See updated answer below.

I'm wondering if an analytics approach would work better than an aggregation here.

Instead of using a time window, you calculate and emit a record for each event in input:

SELECT
    Factory_Id,
    COUNT(*) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS events_count,
    system.timestamp() as last_event_time,
    SUM(event_value) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) as event_value_total
INTO PowerBI
FROM [eventhub] TIMESTAMP BY event_create_time

The only hiccup is for events landing on the same time stamp:

{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:00:00", "event_value" : 0.1}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 2}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 10}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:02:00", "event_value" : 0.2}

You won't get a single record on that timestamp:

Factory_Id events_count last_event_time event_value_total
1 1 2021-12-10T10:00:00.0000000Z 0.1
1 2 2021-12-10T10:01:00.0000000Z 2.1
1 3 2021-12-10T10:01:00.0000000Z 12.1
1 4 2021-12-10T10:02:00.0000000Z 12.2

We may want to add a step to the query to deal with it if it's an issue for your dashboard. Let me know!

EDIT following comment

This new version will emit progressive results on a daily tumbling window. To do that, every time we get a new record, we collect the last 24h. Then we remove the rows from the previous day, and re-calculate the new aggregates. To collect properly, we first need to make sure we only have 1 record per timestamp.

-- First we make sure we get only 1 record per timestamp, to avoid duplication in the analytics function below
WITH Collapsed AS (
    SELECT
        Factory_Id,
        system.timestamp() as last_event_time,
        COUNT(*) AS C,
        SUM(event_value) AS S
    FROM [input1] TIMESTAMP BY event_create_time
    GROUP BY Factory_Id, system.timestamp()
),

-- Then we build an array at each timestamp, containing all records from the last 24h
Collected as (
    SELECT
        Factory_Id,
        system.timestamp() as last_event_time,
        COLLECT() OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS all_events
    FROM Collapsed
)

-- Finally we expand the array, removing the rows on the previous day, and aggregate
SELECT
    C.Factory_Id,
    system.timestamp() as last_event_time,
    SUM(U.ArrayValue.C) AS events_count,
    SUM(U.ArrayValue.S) AS event_value_total
FROM Collected AS C
CROSS APPLY GETARRAYELEMENTS(C.all_events) AS U
WHERE DAY(U.ArrayValue.last_event_time) = DAY(system.Timestamp())
GROUP BY C.Factory_Id, C.last_event_time, system.timestamp()

Let me know how it goes.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1