ARCHIVED from builddistributedsystem.com on 2026-04-28 — URL: https://builddistributedsystem.com/tracks/mapreducer/tasks/task-28-2-4-watermarks
TASK

Implementation

Events in a distributed stream do not always arrive in the order they occurred. A click at 10:00:00 may arrive after a click at 10:00:05 due to network delays. Without handling this, the 10:00:00 event gets dropped when its window has already closed.

Watermarks solve this: the watermark represents the point in event time up to which the processor believes it has seen all data. It advances as newer events arrive, and a window only closes once the watermark passes its end boundary.

allowed_lateness = 30s

Events arriving:
  10:00:10  -> watermark = 10:00:10 - 30s = 09:59:40
  10:00:30  -> watermark = 10:00:30 - 30s = 10:00:00
  10:00:00  -> LATE (event_time < watermark), window still open -> accepted
  10:01:00  -> watermark = 10:01:00 - 30s = 10:00:30

Your node handles two message types:

// Compute the watermark given the max timestamp seen
{ "type": "watermark", "msg_id": 1,
  "max_timestamp": "2024-01-15T10:00:00Z",
  "allowed_lateness_ms": 30000 }
-> { "type": "watermark", "in_reply_to": 1,
    "watermark": "2024-01-15T09:59:30Z" }

// Process an event — determine if it is late or on-time
{ "type": "process", "msg_id": 2,
  "event": {"id": 1},
  "event_time": "2024-01-15T10:00:00Z",
  "watermark":  "2024-01-15T10:00:30Z" }
-> { "type": "late_event", "in_reply_to": 2,
    "event_id": 1, "handled": "dropped" }

An event is late when event_time < watermark. Late events are dropped.

Sample Test Cases

Generate watermarkTimeout: 5000ms
Input
{
  "src": "generator",
  "dest": "processor",
  "body": {
    "type": "watermark",
    "msg_id": 1,
    "max_timestamp": "2024-01-15T10:00:00Z",
    "allowed_lateness_ms": 30000
  }
}
Expected Output
{"type": "watermark", "in_reply_to": 1, "watermark": "2024-01-15T09:59:30Z"}
Handle late eventTimeout: 5000ms
Input
{
  "src": "stream",
  "dest": "processor",
  "body": {
    "type": "process",
    "msg_id": 1,
    "event": {
      "id": 1
    },
    "event_time": "2024-01-15T10:00:00Z",
    "watermark": "2024-01-15T10:00:30Z"
  }
}
Expected Output
{"type": "late_event", "in_reply_to": 1, "event_id": 1, "handled": "dropped"}

Hints

Hint 1
Watermark = max event timestamp seen so far - allowed_lateness_ms
Hint 2
An event is late if its event_time < current watermark
Hint 3
A window closes when the watermark passes window_end
Hint 4
Late events that fall within a still-open window are accepted; others are dropped
Hint 5
The watermark only moves forward — it never decreases
OVERVIEW

Theoretical Hub

Concept overview coming soon

Key Concepts

watermarksout-of-order eventsevent timeallowed latenesslate event handling
main.py
python
Handle Out-of-Order Events with Watermarks - The MapReducer | Build Distributed Systems