TASK
Implementation
Implement MapReduce: Map emits (key, value) pairs, shuffle groups by key, Reduce aggregates. Build word count as example.
Sample Test Cases
Map emits key-value pairsTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"mapreduce_map","msg_id":2,"data":["hello world","hello"],"mapper":"word_count"}}
Expected Output
{"src":"n1","dest":"c0","body":{"type":"init_ok","in_reply_to":1,"msg_id":0}}
{"src":"n1","dest":"c1","body":{"type":"mapreduce_map_ok","in_reply_to":2,"msg_id":1,"mapped":[["hello",1],["world",1],["hello",1]]}}
Hints
Hint 1▾
Map phase: emit key-value pairs
Hint 2▾
Shuffle: group by key
Hint 3▾
Reduce phase: aggregate values
OVERVIEW
Theoretical Hub
MapReduce
MapReduce splits batch jobs into parallelizable map and reduce phases. Map transforms data, Reduce aggregates. Shuffle handles data movement between phases.
Key Concepts
MapReducebatch processingword count
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/env python3
from collections import defaultdict
class MapReduce:
def __init__(self, num_workers=4):
self.num_workers = num_workers
def map(self, data, mapper):
# TODO: Apply mapper to each item, collect (key, value) pairs
pass
def shuffle(self, mapped):
# TODO: Group by key
pass
def reduce(self, grouped, reducer):
# TODO: Apply reducer to each group
pass
def run(self, data, mapper, reducer):
mapped = self.map(data, mapper)
grouped = self.shuffle(mapped)
return self.reduce(grouped, reducer)
# Word count mapper and reducer
def word_mapper(line):
for word in line.split():
yield (word.lower(), 1)
def count_reducer(key, values):
return (key, sum(values))