ARCHIVED from builddistributedsystem.com on 2026-04-28 — URL: https://builddistributedsystem.com/tracks/mapreducer/tasks/task-28-2-2-tumbling-windows
TASK

Implementation

Tumbling windows divide an infinite stream into fixed-size, non-overlapping time buckets. Each event belongs to exactly one window. When the window period ends, you emit the aggregate and start a fresh window.

Events:  e1(10:00:10)  e2(10:00:40)  e3(10:01:15)  e4(10:01:50)
Windows: [---- 10:00 - 10:01 ----]   [---- 10:01 - 10:02 ----]
         e1, e2  →  count=2           e3, e4  →  count=2

Your node handles three message types:

// Assign a single event to its window (window_size_ms = 60000 → 1-minute windows)
{ "type": "assign", "msg_id": 1,
  "events": [{"id":1,"timestamp":"2024-01-15T10:00:10Z"}],
  "window_size_ms": 60000 }{ "type": "assigned", "in_reply_to": 1,
    "window_id": "window-1705305600000",
    "window_start": "2024-01-15T10:00:00Z",
    "window_end":   "2024-01-15T10:01:00Z" }

// Process a stream of events and return window aggregates
{ "type": "process_window", "msg_id": 2,
  "events": [
    {"id":1,"timestamp":"2024-01-15T10:00:10Z"},
    {"id":2,"timestamp":"2024-01-15T10:00:40Z"},
    {"id":3,"timestamp":"2024-01-15T10:01:15Z"}
  ],
  "window_size_ms": 60000 }{ "type": "window_result", "in_reply_to": 2,
    "windows": [
      {"window_id":"window-1705305600000","count":2,"events":[1,2]},
      {"window_id":"window-1705305660000","count":1,"events":[3]}
    ]}

Window ID formula: floor(timestamp_ms / window_size_ms) * window_size_ms

Sample Test Cases

Assign events to windowsTimeout: 5000ms
Input
{
  "src": "stream",
  "dest": "windower",
  "body": {
    "type": "assign",
    "msg_id": 1,
    "events": [
      {
        "id": 1,
        "timestamp": "2024-01-15T10:00:10Z"
      }
    ],
    "window_size_ms": 60000
  }
}
Expected Output
{"type": "assigned", "in_reply_to": 1, "window_id": "window-1705305600000", "window_start": "2024-01-15T10:00:00Z", "window_end": "2024-01-15T10:01:00Z"}
Process multiple windowsTimeout: 5000ms
Input
{
  "src": "stream",
  "dest": "windower",
  "body": {
    "type": "process_window",
    "msg_id": 1,
    "events": [
      {
        "id": 1,
        "timestamp": "2024-01-15T10:00:10Z"
      },
      {
        "id": 2,
        "timestamp": "2024-01-15T10:00:40Z"
      },
      {
        "id": 3,
        "timestamp": "2024-01-15T10:01:15Z"
      }
    ],
    "window_size_ms": 60000
  }
}
Expected Output
{"type": "window_result", "in_reply_to": 1, "windows": [{"window_id": "window-1705305600000", "count": 2, "events": [1, 2]}, {"window_id": "window-1705305660000", "count": 1, "events": [3]}]}

Hints

Hint 1
Window ID = floor(event_timestamp_ms / window_size_ms) * window_size_ms
Hint 2
Each event belongs to exactly one window — windows never overlap
Hint 3
Window end = window_start + window_size_ms
Hint 4
Aggregate events per window_id: keep a count and list of events
Hint 5
close emits the result for a window and removes it from active state
OVERVIEW

Theoretical Hub

Concept overview coming soon

Key Concepts

tumbling windowstime-based windowswindow aggregationnon-overlapping windowsevent time
main.py
python
Implement Tumbling Windows - The MapReducer | Build Distributed Systems