TASK
Implementation
Long-running MapReduce jobs will inevitably encounter worker failures. Fault tolerance means detecting failures quickly and retrying the affected tasks on healthy workers without restarting the entire job.
Your node (the master) must handle these messages:
// Record a heartbeat from a worker
{ "type": "heartbeat", "msg_id": 1, "worker_id": "w1", "timestamp": 1700000000000 }
→ { "type": "heartbeat_ok", "in_reply_to": 1 }
// Check which workers have timed out (no heartbeat for > timeout_ms)
{ "type": "check_failures", "msg_id": 2, "timeout_ms": 5000, "now": 1700000010000 }
→ { "type": "failures_detected", "in_reply_to": 2, "failed_workers": ["w2"] }
// Reassign all tasks from a failed worker to a healthy one
{ "type": "reassign", "msg_id": 3, "failed_worker": "w2", "healthy_worker": "w3",
"tasks": [{"id": "t1", "chunk": ["hello world"]}] }
→ { "type": "reassigned", "in_reply_to": 3, "reassigned_tasks": ["t1"] }A worker is considered failed when now - last_heartbeat_timestamp > timeout_ms. Tasks assigned to failed workers are retried — they are idempotent, so running them again on a different worker is always safe.
Sample Test Cases
Record heartbeatTimeout: 5000ms
Input
{
"src": "w1",
"dest": "master",
"body": {
"type": "heartbeat",
"msg_id": 1,
"worker_id": "w1",
"timestamp": 1700000000000
}
}Expected Output
{"type": "heartbeat_ok", "in_reply_to": 1}Detect timed-out workerTimeout: 5000ms
Input
{"src":"w1","dest":"master","body":{"type":"heartbeat","msg_id":1,"worker_id":"w1","timestamp":1700000000000}}
{"src":"monitor","dest":"master","body":{"type":"check_failures","msg_id":2,"timeout_ms":5000,"now":1700000010000}}
Expected Output
{"type": "heartbeat_ok", "in_reply_to": 1}
{"type": "failures_detected", "in_reply_to": 2, "failed_workers": ["w1"]}Hints
Hint 1▾
A worker is failed if now - last_heartbeat > timeout_ms
Hint 2▾
On failure, find all tasks assigned to that worker and re-queue them
Hint 3▾
Tasks must be idempotent: re-running produces the same result
Hint 4▾
Speculative execution: if a task is running too long, start it on a second worker
Hint 5▾
Use task attempts counter; drop the result from the slower duplicate
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
fault toleranceworker failuretask retryheartbeatspeculative executionidempotence
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()