ARCHIVED from builddistributedsystem.com on 2026-04-28 — URL: https://builddistributedsystem.com/tracks/mapreducer/tasks/task-28-2-1-streaming-wordcount
TASK

Implementation

Batch MapReduce waits for all data before producing output. Stream processing handles an infinite flow of events: state is updated as each event arrives, and results can be queried at any time.

Your node maintains a running word count across all received messages:

// Process a batch of words — update running counts
{ "type": "process", "msg_id": 1, "words": ["hello", "world", "hello"] }{ "type": "processed", "in_reply_to": 1, "counts": {"hello": 2, "world": 1} }

// Return the top N words by count
{ "type": "topn", "msg_id": 2, "n": 2,
  "counts": {"hello": 5, "world": 3, "stream": 1} }{ "type": "topn", "in_reply_to": 2,
    "top_words": [["hello", 5], ["world", 3]] }

// Increment a single word and return its new count
{ "type": "update", "msg_id": 3, "word": "hello", "current_count": 5 }{ "type": "updated", "in_reply_to": 3, "word": "hello", "new_count": 6 }

// Emit top N from current in-memory state (periodic output)
{ "type": "output", "msg_id": 4, "interval_ms": 1000, "counts": {"hello": 10} }{ "type": "periodic_output", "in_reply_to": 4,
    "top_words": [["hello", 10]] }

Unlike batch processing, the node never resets counts between messages — every process call adds to the global running totals.

Sample Test Cases

Process word streamTimeout: 5000ms
Input
{
  "src": "stream",
  "dest": "processor",
  "body": {
    "type": "process",
    "msg_id": 1,
    "words": [
      "hello",
      "world",
      "hello"
    ]
  }
}
Expected Output
{"type": "processed", "in_reply_to": 1, "counts": {"hello": 2, "world": 1}}
Output top N wordsTimeout: 5000ms
Input
{
  "src": "stream",
  "dest": "processor",
  "body": {
    "type": "topn",
    "msg_id": 1,
    "n": 2,
    "counts": {
      "hello": 5,
      "world": 3,
      "stream": 1
    }
  }
}
Expected Output
{"type": "topn", "in_reply_to": 1, "top_words": [["hello", 5], ["world", 3]]}

Hints

Hint 1
Keep a running word count dict in memory, update on every process message
Hint 2
topn: sort the dict by count descending, return the first N entries
Hint 3
update increments a single word by 1 and returns the new count
Hint 4
output emits the top N words from current state without resetting it
Hint 5
Lowercase and strip words before counting
OVERVIEW

Theoretical Hub

Concept overview coming soon

Key Concepts

stream processingstateful processingrunning aggregatestop-Nincremental updates
main.py
python
Implement Streaming Word Count - The MapReducer | Build Distributed Systems