ARCHIVED from builddistributedsystem.com on 2026-04-28 — URL: https://builddistributedsystem.com/tracks/mapreducer/tasks/task-28-1-5-chained-mapreduce
TASK

Implementation

Complex data analysis often needs multiple MapReduce stages. A chained pipeline feeds the output of one job directly as input to the next, keeping each job focused on a single transformation.

Example use-case: find the top 3 most frequent words across a document set.

Stage 1 (word count):  ["hello world", "hello there"]
                       → {"hello":2, "world":1, "there":1}

Stage 2 (sort by freq): {"hello":2, "world":1, "there":1}
                        → [["hello",2], ["world",1], ["there",1]]

Stage 3 (top N):        [["hello",2], ["world",1], ["there",1]], N=2
                        → [["hello",2], ["world",1]]

Your node handles a single pipeline message that runs all three stages and returns each stage's output:

{ "type": "pipeline", "msg_id": 1,
  "lines": ["hello world", "hello there", "world peace"],
  "top_n": 2 }{ "type": "pipeline_result", "in_reply_to": 1,
    "stage1": {"hello":2,"world":2,"there":1,"peace":1},
    "stage2": [["hello",2],["world",2],["there",1],["peace",1]],
    "stage3": [["hello",2],["world",2]] }

When frequencies are equal (hello and world both 2), sort those keys alphabetically as a tiebreaker so the output is deterministic.

Sample Test Cases

Run full pipelineTimeout: 5000ms
Input
{
  "src": "client",
  "dest": "pipeline",
  "body": {
    "type": "pipeline",
    "msg_id": 1,
    "lines": [
      "hello world",
      "hello there",
      "world peace"
    ],
    "top_n": 2
  }
}
Expected Output
{"type": "pipeline_result", "in_reply_to": 1, "stage1": {"hello": 2, "world": 2, "there": 1, "peace": 1}, "stage2": [["hello", 2], ["world", 2], ["peace", 1], ["there", 1]], "stage3": [["hello", 2], ["world", 2]]}
Top 1 wordTimeout: 5000ms
Input
{
  "src": "client",
  "dest": "pipeline",
  "body": {
    "type": "pipeline",
    "msg_id": 1,
    "lines": [
      "a b a",
      "a c"
    ],
    "top_n": 1
  }
}
Expected Output
{"type": "pipeline_result", "in_reply_to": 1, "stage1": {"a": 3, "b": 1, "c": 1}, "stage2": [["a", 3], ["b", 1], ["c", 1]], "stage3": [["a", 3]]}

Hints

Hint 1
Run jobs in order: output of job[i] becomes input of job[i+1]
Hint 2
Job 1 is a word count (map → reduce)
Hint 3
Job 2 sorts the word-count results by frequency descending
Hint 4
Job 3 takes the top N entries from the sorted list
Hint 5
Return intermediate results for each stage so the caller can inspect them
OVERVIEW

Theoretical Hub

Concept overview coming soon

Key Concepts

pipelinejob chainingmulti-stage processingintermediate datatop-Nsecondary sort
main.py
python
Implement Chained MapReduce Pipeline - The MapReducer | Build Distributed Systems