'Kinesis Analytics SQL query to narrow down the sensors that are not sending data

Context: We use Kinesis analytics to process our sensor data and find anomalies in the sensor data.

Goal: We need to identify the sensors that didn’t send the data for the past X minutes.

The following methods have been tried with Kinesis analytics SQL, but no luck:

  • Stagger Window technique works for the first 3 test cases, but doesn't work for test case 4.
CREATE OR REPLACE PUMP "STREAM_PUMP_ALERT_DOSCONNECTION" AS INSERT INTO "INTERMEDIATE_SQL_STREAM" SELECT STREAM "deviceID" as "device_key", count("deviceID") as "device_count", ROWTIME as "time" FROM  "INTERMEDIATE_SQL_STREAM_FOR_ROOM"
    WINDOWED BY STAGGER (
        PARTITION BY "deviceID", ROWTIME  RANGE INTERVAL '1' MINUTE);
  • Left join and group by queries mentioned below doesn't work for all the test cases.

Query 1:

    CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
    INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
        SELECT STREAM
        ROWTIME as "resultrowtime",
        Input2."device_key" as "device_key",
    FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
    OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
    LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
    ON
        Input1."device_key" = Input2."device_key"
        AND Input1.ROWTIME <> Input2.ROWTIME;

Query 2:

CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
    SELECT STREAM
    ROWTIME as "resultrowtime",
    Input2."device_key" as "device_key"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
    Input1."device_key" = Input2."device_key"
    AND TSDIFF(Input1, Input2) > 0;

Query 3:

CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
    SELECT STREAM
    ROWTIME as "resultrowtime",
    Input2."device_key" as "device_key"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
    Input1."device_key" = Input2."device_key"
    AND Input1.ROWTIME = Input2.ROWTIME;
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP2" AS
    INSERT INTO "DIS_CONN_DEST_SQL_STREAM_ALERT"
        SELECT STREAM "device_key", "count"
        FROM (
            SELECT STREAM
                "device_key",
                COUNT(*) as "count"
            FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM2
            GROUP BY FLOOR(INTERMEDIATE_SQL_STREAM_FOR_ROOM2.ROWTIME TO MINUTE), "device_key"
            )
        WHERE "count" = 1;

Here are test cases for your reference:

Test case 1:

  • Device 1 and Device 2 send data continuously to the Kinesis Analytics.
  • After X minutes, Device 2 continues to send the data, but device 1 is not sending the data.

Output for test case 1:

We want the Kinesis Analytics to pop out Device 1, so that we know which device is not sending data.

Test case 2 (Interval - 10 minutes)

  • Device 1 sends data at 09:00
  • Device 2 sends data at 09:02
  • Device 2 again sends the data at 09:11, but Device 1 doesn’t send any data.

Output for test case 2:

We want the Kinesis Analytics to pop out Device 1, so that we know which device is not sending data.

Test case 3 (Interval - 10 minutes)

  • Device 1 and device 2 send data continuously to kinesis analytics.
  • Both devices (1 & 2) don't send any data for the next 15 minutes.

Output for test case 3:

We want the Kinesis Analytics to pop out Device 1 & Device 2, so that we know which devices are not sending data.

Test case 4: (Interval - 10 mins)

  • Device 1 sends data at 09:00
  • Device 2 sends data at 09:02
  • Device 1 again sends data at 09:04
  • Device 2 again sends data at 09:06
  • Then no data

Output for test case 4:

We want the analytics to pop out device 1 at 09:14 and pop out device 2 at 09:16. So that we can get the disconnected devices(i.e devices not sending data) after the exact interval.

Note: AWS Support directed us to simple queries that don't answer the question. Looks like they can help with the exact query only if we upgrade our support plan.



Solution 1:[1]

I'm not familiar with all of the ways in which AWS has extended or modified Apache Flink, but open source Flink doesn't provide a simple way to detect that all sources have ceased to send data. One solution is to use something like a process function with processing-time timers to detect the absence of data.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 David Anderson