TASK
Implementation
Complex data analysis often needs multiple MapReduce stages. A chained pipeline feeds the output of one job directly as input to the next, keeping each job focused on a single transformation.
Example use-case: find the top 3 most frequent words across a document set.
Stage 1 (word count): ["hello world", "hello there"]
→ {"hello":2, "world":1, "there":1}
Stage 2 (sort by freq): {"hello":2, "world":1, "there":1}
→ [["hello",2], ["world",1], ["there",1]]
Stage 3 (top N): [["hello",2], ["world",1], ["there",1]], N=2
→ [["hello",2], ["world",1]]Your node handles a single pipeline message that runs all three stages and returns each stage's output:
{ "type": "pipeline", "msg_id": 1,
"lines": ["hello world", "hello there", "world peace"],
"top_n": 2 }
→ { "type": "pipeline_result", "in_reply_to": 1,
"stage1": {"hello":2,"world":2,"there":1,"peace":1},
"stage2": [["hello",2],["world",2],["there",1],["peace",1]],
"stage3": [["hello",2],["world",2]] }When frequencies are equal (hello and world both 2), sort those keys alphabetically as a tiebreaker so the output is deterministic.
Sample Test Cases
Run full pipelineTimeout: 5000ms
Input
{
"src": "client",
"dest": "pipeline",
"body": {
"type": "pipeline",
"msg_id": 1,
"lines": [
"hello world",
"hello there",
"world peace"
],
"top_n": 2
}
}Expected Output
{"type": "pipeline_result", "in_reply_to": 1, "stage1": {"hello": 2, "world": 2, "there": 1, "peace": 1}, "stage2": [["hello", 2], ["world", 2], ["peace", 1], ["there", 1]], "stage3": [["hello", 2], ["world", 2]]}Top 1 wordTimeout: 5000ms
Input
{
"src": "client",
"dest": "pipeline",
"body": {
"type": "pipeline",
"msg_id": 1,
"lines": [
"a b a",
"a c"
],
"top_n": 1
}
}Expected Output
{"type": "pipeline_result", "in_reply_to": 1, "stage1": {"a": 3, "b": 1, "c": 1}, "stage2": [["a", 3], ["b", 1], ["c", 1]], "stage3": [["a", 3]]}Hints
Hint 1▾
Run jobs in order: output of job[i] becomes input of job[i+1]
Hint 2▾
Job 1 is a word count (map → reduce)
Hint 3▾
Job 2 sorts the word-count results by frequency descending
Hint 4▾
Job 3 takes the top N entries from the sorted list
Hint 5▾
Return intermediate results for each stage so the caller can inspect them
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
pipelinejob chainingmulti-stage processingintermediate datatop-Nsecondary sort
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()