TASK
Implementation
Synchronous RPC blocks the caller until a reply arrives, which prevents the node from handling other messages during that time. In high-throughput distributed systems, asynchronous RPC is preferred.
Your task is to implement an async_rpc method that:
- Sends a message to a target node
- Registers a callback function keyed by the outgoing
msg_id - Returns immediately (non-blocking)
- When a reply arrives (with matching
in_reply_to), invokes the callback with the reply body
Implement a batch_echo message type: the node receives a list of strings, sends an echo RPC for each one to itself (loopback), and collects all replies using callbacks. Once all replies are collected, respond with the results.
Request: {"type": "batch_echo", "msg_id": 1, "values": ["a", "b", "c"]}
Response: {"type": "batch_echo_ok", "in_reply_to": 1, "results": ["a", "b", "c"]}For testing, the node should echo to itself (src and dest are the same node).
Sample Test Cases
Init and echo still workTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"echo","msg_id":2,"echo":"async"}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "c1", "body": {"type": "echo_ok", "echo": "async", "in_reply_to": 2, "msg_id": 1}}
Callback registration sends RPCTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"batch_echo","msg_id":2,"values":["x"]}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "n1", "body": {"type": "echo", "echo": "x", "msg_id": 1}}
Hints
Hint 1▾
Store callbacks in a dictionary keyed by msg_id
Hint 2▾
When a reply arrives, look up the callback by in_reply_to and invoke it
Hint 3▾
The callback should receive the reply body as its argument
Hint 4▾
Use a handler map to dispatch different message types
Hint 5▾
Async RPC allows the node to continue processing other messages while waiting
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
asynchronous programmingcallbacksnon-blocking I/Oevent-driven
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python3
import sys
import json
class Node:
def __init__(self):
self.node_id = None
self.node_ids = []
self.next_msg_id = 0
self.callbacks = {} # msg_id -> callback function
def send(self, dest, body):
body["msg_id"] = self.next_msg_id
msg_id = self.next_msg_id
self.next_msg_id += 1
message = {"src": self.node_id, "dest": dest, "body": body}
print(json.dumps(message), flush=True)
return msg_id
def reply(self, request, body):
body["in_reply_to"] = request["body"]["msg_id"]
self.send(request["src"], body)
def async_rpc(self, dest, body, callback):
# TODO: Send message and register callback for the reply
# The callback should be called with the reply body when it arrives
pass
def handle_reply(self, body):
# TODO: Look up callback by in_reply_to and invoke it
pass
def main():
node = Node()
for line in sys.stdin: