TASK
Implementation
Events in a distributed stream do not always arrive in the order they occurred. A click at 10:00:00 may arrive after a click at 10:00:05 due to network delays. Without handling this, the 10:00:00 event gets dropped when its window has already closed.
Watermarks solve this: the watermark represents the point in event time up to which the processor believes it has seen all data. It advances as newer events arrive, and a window only closes once the watermark passes its end boundary.
allowed_lateness = 30s
Events arriving:
10:00:10 -> watermark = 10:00:10 - 30s = 09:59:40
10:00:30 -> watermark = 10:00:30 - 30s = 10:00:00
10:00:00 -> LATE (event_time < watermark), window still open -> accepted
10:01:00 -> watermark = 10:01:00 - 30s = 10:00:30Your node handles two message types:
// Compute the watermark given the max timestamp seen
{ "type": "watermark", "msg_id": 1,
"max_timestamp": "2024-01-15T10:00:00Z",
"allowed_lateness_ms": 30000 }
-> { "type": "watermark", "in_reply_to": 1,
"watermark": "2024-01-15T09:59:30Z" }
// Process an event — determine if it is late or on-time
{ "type": "process", "msg_id": 2,
"event": {"id": 1},
"event_time": "2024-01-15T10:00:00Z",
"watermark": "2024-01-15T10:00:30Z" }
-> { "type": "late_event", "in_reply_to": 2,
"event_id": 1, "handled": "dropped" }An event is late when event_time < watermark. Late events are dropped.
Sample Test Cases
Generate watermarkTimeout: 5000ms
Input
{
"src": "generator",
"dest": "processor",
"body": {
"type": "watermark",
"msg_id": 1,
"max_timestamp": "2024-01-15T10:00:00Z",
"allowed_lateness_ms": 30000
}
}Expected Output
{"type": "watermark", "in_reply_to": 1, "watermark": "2024-01-15T09:59:30Z"}Handle late eventTimeout: 5000ms
Input
{
"src": "stream",
"dest": "processor",
"body": {
"type": "process",
"msg_id": 1,
"event": {
"id": 1
},
"event_time": "2024-01-15T10:00:00Z",
"watermark": "2024-01-15T10:00:30Z"
}
}Expected Output
{"type": "late_event", "in_reply_to": 1, "event_id": 1, "handled": "dropped"}Hints
Hint 1▾
Watermark = max event timestamp seen so far - allowed_lateness_ms
Hint 2▾
An event is late if its event_time < current watermark
Hint 3▾
A window closes when the watermark passes window_end
Hint 4▾
Late events that fall within a still-open window are accepted; others are dropped
Hint 5▾
The watermark only moves forward — it never decreases
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
watermarksout-of-order eventsevent timeallowed latenesslate event handling
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()