TASK
Implementation
In distributed systems, messages can be lost or delayed indefinitely. A single RPC call with a timeout is not enough — you need a retry loop to handle transient failures.
Your task is to implement a rpc_with_retry method that:
- Sends an RPC to the target node
- Waits for a reply with a timeout (default: 500ms)
- If no reply arrives, retries the RPC (with a new msg_id)
- Gives up after
max_retriesattempts (default: 3) - Returns the response body on success, or None after all retries fail
Implement a relay message type to test this: when your node receives a relay request, it uses rpc_with_retry to forward a message to the target node.
Request: {"type": "relay", "msg_id": 1, "target": "n2", "payload": {"type": "read"}}
Response: {"type": "relay_ok", "in_reply_to": 1, "attempts": 3, "result": {...}}The response should include the number of attempts made. If all retries fail, respond with an error:
{"type": "error", "in_reply_to": 1, "code": 0, "text": "RPC failed after 3 attempts"}Sample Test Cases
Init and echo still work with retry nodeTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1","n2"]}}
{"src":"c1","dest":"n1","body":{"type":"echo","msg_id":2,"echo":"retry-test"}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "c1", "body": {"type": "echo_ok", "echo": "retry-test", "in_reply_to": 2, "msg_id": 1}}
Relay emits outgoing RPC messageTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1","n2"]}}
{"src":"c1","dest":"n1","body":{"type":"relay","msg_id":2,"target":"n2","payload":{"type":"read"}}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "n2", "body": {"type": "read", "msg_id": 1}}
Hints
Hint 1▾
Wrap your sync_rpc in a loop that retries on timeout
Hint 2▾
Track the number of attempts and give up after max_retries
Hint 3▾
Each retry should use a new msg_id for the outgoing message
Hint 4▾
Log each retry attempt to stderr for observability
Hint 5▾
Return the first successful response or an error after exhausting retries
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
timeoutretry logicfault toleranceat-least-once delivery
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
import sys
import json
import threading
import time
class Node:
def __init__(self):
self.node_id = None
self.node_ids = []
self.next_msg_id = 0
self.lock = threading.Lock()
self.pending = {}
self.replies = {}
def send(self, dest, body):
with self.lock:
body["msg_id"] = self.next_msg_id
msg_id = self.next_msg_id
self.next_msg_id += 1
message = {"src": self.node_id, "dest": dest, "body": body}
print(json.dumps(message), flush=True)
return msg_id
def reply(self, request, body):
body["in_reply_to"] = request["body"]["msg_id"]
self.send(request["src"], body)
def sync_rpc(self, dest, body, timeout=0.5):
event = threading.Event()
msg_id = self.send(dest, dict(body))
self.pending[msg_id] = event
event.wait(timeout=timeout)
result = self.replies.pop(msg_id, None)
self.pending.pop(msg_id, None)
return result
def rpc_with_retry(self, dest, body, timeout=0.5, max_retries=3):
# TODO: Implement retry loop around sync_rpc
# Return (result, attempts) tuple