TASK
Implementation
Build stream processor with windowing. Support tumbling and sliding windows with event-time processing.
Sample Test Cases
Add event to windowTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"stream_event","msg_id":2,"event":{"data":"click","value":1},"timestamp":5,"window_size":10}}
Expected Output
{"src":"n1","dest":"c0","body":{"type":"init_ok","in_reply_to":1,"msg_id":0}}
{"src":"n1","dest":"c1","body":{"type":"stream_event_ok","in_reply_to":2,"msg_id":1,"window_key":0,"window_events":1}}
Hints
Hint 1▾
Tumbling vs sliding windows
Hint 2▾
Handle late arrivals
Hint 3▾
Watermarks for event-time
Resources
OVERVIEW
Theoretical Hub
Stream Processing
Unlike batch, stream processes data continuously. Windows aggregate over time. Watermarks handle late data in event-time processing.
Key Concepts
streamingwindowingexactly-once
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python3
import time
from collections import defaultdict
class StreamProcessor:
def __init__(self, window_size=10):
self.window_size = window_size
self.windows = defaultdict(list)
def process(self, event):
# TODO: Add to appropriate window
pass
def get_window_key(self, timestamp):
return timestamp // self.window_size
def close_window(self, window_key):
# TODO: Aggregate and emit window result
pass
class SlidingWindow:
def __init__(self, size, slide):
self.size = size
self.slide = slide
self.events = []
def add(self, event, timestamp):
# TODO: Add event, expire old events
pass
def get_windows(self, timestamp):
# TODO: Return all active windows containing timestamp
pass