TASK
Implementation
Tumbling windows divide an infinite stream into fixed-size, non-overlapping time buckets. Each event belongs to exactly one window. When the window period ends, you emit the aggregate and start a fresh window.
Events: e1(10:00:10) e2(10:00:40) e3(10:01:15) e4(10:01:50)
Windows: [---- 10:00 - 10:01 ----] [---- 10:01 - 10:02 ----]
e1, e2 → count=2 e3, e4 → count=2Your node handles three message types:
// Assign a single event to its window (window_size_ms = 60000 → 1-minute windows)
{ "type": "assign", "msg_id": 1,
"events": [{"id":1,"timestamp":"2024-01-15T10:00:10Z"}],
"window_size_ms": 60000 }
→ { "type": "assigned", "in_reply_to": 1,
"window_id": "window-1705305600000",
"window_start": "2024-01-15T10:00:00Z",
"window_end": "2024-01-15T10:01:00Z" }
// Process a stream of events and return window aggregates
{ "type": "process_window", "msg_id": 2,
"events": [
{"id":1,"timestamp":"2024-01-15T10:00:10Z"},
{"id":2,"timestamp":"2024-01-15T10:00:40Z"},
{"id":3,"timestamp":"2024-01-15T10:01:15Z"}
],
"window_size_ms": 60000 }
→ { "type": "window_result", "in_reply_to": 2,
"windows": [
{"window_id":"window-1705305600000","count":2,"events":[1,2]},
{"window_id":"window-1705305660000","count":1,"events":[3]}
]}Window ID formula: floor(timestamp_ms / window_size_ms) * window_size_ms
Sample Test Cases
Assign events to windowsTimeout: 5000ms
Input
{
"src": "stream",
"dest": "windower",
"body": {
"type": "assign",
"msg_id": 1,
"events": [
{
"id": 1,
"timestamp": "2024-01-15T10:00:10Z"
}
],
"window_size_ms": 60000
}
}Expected Output
{"type": "assigned", "in_reply_to": 1, "window_id": "window-1705305600000", "window_start": "2024-01-15T10:00:00Z", "window_end": "2024-01-15T10:01:00Z"}Process multiple windowsTimeout: 5000ms
Input
{
"src": "stream",
"dest": "windower",
"body": {
"type": "process_window",
"msg_id": 1,
"events": [
{
"id": 1,
"timestamp": "2024-01-15T10:00:10Z"
},
{
"id": 2,
"timestamp": "2024-01-15T10:00:40Z"
},
{
"id": 3,
"timestamp": "2024-01-15T10:01:15Z"
}
],
"window_size_ms": 60000
}
}Expected Output
{"type": "window_result", "in_reply_to": 1, "windows": [{"window_id": "window-1705305600000", "count": 2, "events": [1, 2]}, {"window_id": "window-1705305660000", "count": 1, "events": [3]}]}Hints
Hint 1▾
Window ID = floor(event_timestamp_ms / window_size_ms) * window_size_ms
Hint 2▾
Each event belongs to exactly one window — windows never overlap
Hint 3▾
Window end = window_start + window_size_ms
Hint 4▾
Aggregate events per window_id: keep a count and list of events
Hint 5▾
close emits the result for a window and removes it from active state
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
tumbling windowstime-based windowswindow aggregationnon-overlapping windowsevent time
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()