TASK
Implementation
MapReduce splits work into two simple phases: map transforms each input record into key-value pairs, and reduce aggregates all values for the same key.
Your node handles three message types:
// Map a single line into word-count pairs
{ "type": "map", "msg_id": 1, "line": "hello world hello" }
→ { "type": "map_result", "in_reply_to": 1, "pairs": [["hello",1],["world",1],["hello",1]] }
// Reduce a list of values for one key
{ "type": "reduce", "msg_id": 2, "key": "hello", "values": [1,1,1] }
→ { "type": "reduce_result", "in_reply_to": 2, "result": ["hello", 3] }
// Execute a full word-count job over multiple lines
{ "type": "execute", "msg_id": 3, "lines": ["hello world", "hello mapreduce"] }
→ { "type": "job_result", "in_reply_to": 3, "results": {"hello":2,"world":1,"mapreduce":1} }The execute flow: run map on every line → collect all pairs → group pairs by key → reduce each group → return the final counts.
Sample Test Cases
Map word to pairsTimeout: 5000ms
Input
{
"src": "client",
"dest": "mapreduce",
"body": {
"type": "map",
"msg_id": 1,
"line": "hello world hello"
}
}Expected Output
{"type": "map_result", "in_reply_to": 1, "pairs": [["hello", 1], ["world", 1], ["hello", 1]]}Reduce word countsTimeout: 5000ms
Input
{
"src": "client",
"dest": "mapreduce",
"body": {
"type": "reduce",
"msg_id": 1,
"key": "hello",
"values": [
1,
1,
1
]
}
}Expected Output
{"type": "reduce_result", "in_reply_to": 1, "result": ["hello", 3]}Hints
Hint 1▾
Map emits (word, 1) for each word in the input line
Hint 2▾
Reduce sums all values for the same key
Hint 3▾
execute runs map on each line, groups by key, then reduces
Hint 4▾
Use a plain dict/map to accumulate counts during reduce
Hint 5▾
Strip and lowercase words before emitting from map
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
MapReducemap phasereduce phaseword countkey-value pairsshuffle
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()