'Sliding window using Faust
Does anyone know how to implement a sliding window using Faust?
The idea is to count the occurances of a key in a 10, 30, 60, and 300s window, but we need that on a 1s or on every update basis.
I have a dodgy workaround, which seems very inefficient where I have a tumbling 1s window with an expiry of 300s, then I sum all the old values in the table up to the current one using the delta()
method. It seems to cope ok with messages from 6 sources each running at 10 messages/s, but that's about the limit before we see lag. It's obviously a slow method that can't scale up, so the question is how to achieve this without the need for KSQL or setting up a Spark cluster as well as the Kafka cluster.We're trying to keep this simple if we can.
To complicate this, we would dearly love to have the same stats for the last 24 hours, 1 week, 1 month, and last 3 months... all on the fly. But perhaps we're just asking way too much without a dedicated process for each input.
Here's my dodgy code:
class AlarmCount(faust.Record, serializer='json'):
event_id: int
source_id: int
counts_10: int
counts_30: int
counts_60: int
counts_300: int
@app.agent(events_topic)
async def new_event(stream):
async for value in stream:
# calculate the count statistics
counts_10=0
counts_30=0
counts_60=0
counts_300=0
event_counts_table[value.global_id] += 1
for i in range(300):
if(i<=10):
counts_10+=event_counts_table[value.source_id].delta(i)
if(i<=30):
counts_30+=event_counts_table[value.source_id].delta(i)
if(i<=60):
counts_60+=event_counts_table[value.source_id].delta(i)
if(i<=300):
counts_300+=event_counts_table[value.source_id].delta(i)
await event_counts_topic.send(
value=EventCount(
event_id=value.event_id,
source_id=value.source_id,
counts_10=counts_10,
counts_30=counts_30,
counts_60=counts_60,
counts_300=counts_300
)
)
Solution 1:[1]
So there doesn't seem to be a good way to do this.
The best solution I found was to store a list of timestamps for each id in a table and append the timestamp on a new event, then remove expired timestamps, then return the length as the new value top another topic.
The only real issue with this is that it only captures the real event count per time frame on an event - when what would be ideal would be a live update each second of the count for each of the time frames. But I don't think this is what this system is/should/can be used for - it's for event processing so it needs an event. We could use a timer funtion to trigger a recount each second, but that gets noticably taxing on processing speed and throughput, and since the trigger for an alert is on a new event, then it doesn't matter too much. Just nice to have, not essential.
For the longer term stats (we're talking weeks and months here) we decided to write all events to a db then periodically (every 10 seconds) perform a similar task of looking for new events and expired events, then sending the aggregate counts to a Kafka topic for additional processing. It's taking only in the order of 10-20ms to process this data every 10 seconds even with >1000 events per second, so that's manageable as 1000/s is the nightmare scenario which will only happen once in a blue moon and then stop.
Solution 2:[2]
I would like to iterate on all of the windows to compare the last value to the mean/deviation/other-aggregation of all other past values.
- something like
table[key].iter_windows()
- and not looping over all
.delta(i)
Like you, I will implement a table with timestamped lists. If the list is too large, it will be suboptimal as the changelog
will be fat. We are supposed to stream only what is modified and not repeating all the lists on each event.
So I will create a short term list with details and a long term list with aggregations. Then, only the short term list will be updated on each event.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Fonty |
Solution 2 | ouflak |