TASK
Implementation
When a node sends an async RPC but the recipient crashes or the network drops the message, the callback stays in memory forever. This is a resource leak that can eventually consume all available memory.
Your task is to implement a callback reaper that:
- Records the timestamp when each callback is registered
- Periodically scans for callbacks older than a threshold (default: 2 seconds)
- Removes expired callbacks and invokes them with a timeout error
- Reports how many callbacks were reaped
Implement a pending_count message type that returns the number of currently pending callbacks:
Request: {"type": "pending_count", "msg_id": 1}
Response: {"type": "pending_count_ok", "in_reply_to": 1, "count": 5}Also implement a send_fire_forget message type that sends an RPC without expecting a reply (to simulate leaked callbacks):
Request: {"type": "send_fire_forget", "msg_id": 1, "target": "n2", "payload": {"type": "echo", "echo": "lost"}}
Response: {"type": "send_fire_forget_ok", "in_reply_to": 1, "pending": 1}Sample Test Cases
Pending count starts at zeroTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"pending_count","msg_id":2}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "c1", "body": {"type": "pending_count_ok", "count": 0, "in_reply_to": 2, "msg_id": 1}}
Fire-and-forget increases pending countTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1","n2"]}}
{"src":"c1","dest":"n1","body":{"type":"send_fire_forget","msg_id":2,"target":"n2","payload":{"type":"echo","echo":"lost"}}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "n2", "body": {"type": "echo", "echo": "lost", "msg_id": 1}}
{"src": "n1", "dest": "c1", "body": {"type": "send_fire_forget_ok", "pending": 1, "in_reply_to": 2, "msg_id": 2}}
Hints
Hint 1▾
Store the timestamp when each callback is registered
Hint 2▾
Periodically scan the callbacks dictionary for expired entries
Hint 3▾
Use time.time() to get the current timestamp in seconds
Hint 4▾
A reaper interval of 500ms is a good starting point
Hint 5▾
When reaping, invoke the callback with an error or None to signal timeout
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
resource cleanupmemory leaksperiodic tasksgarbage collection
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
import sys
import json
import time
class Node:
def __init__(self):
self.node_id = None
self.node_ids = []
self.next_msg_id = 0
self.callbacks = {} # msg_id -> callback
self.callback_times = {} # msg_id -> registration timestamp
self.reap_threshold = 2.0 # seconds
def send(self, dest, body):
body["msg_id"] = self.next_msg_id
msg_id = self.next_msg_id
self.next_msg_id += 1
message = {"src": self.node_id, "dest": dest, "body": body}
print(json.dumps(message), flush=True)
return msg_id
def reply(self, request, body):
body["in_reply_to"] = request["body"]["msg_id"]
self.send(request["src"], body)
def async_rpc(self, dest, body, callback):
msg_id = self.send(dest, body)
self.callbacks[msg_id] = callback
self.callback_times[msg_id] = time.time()
def handle_reply(self, body):
reply_to = body.get("in_reply_to")
if reply_to is not None and reply_to in self.callbacks:
callback = self.callbacks.pop(reply_to)
self.callback_times.pop(reply_to, None)
callback(body)
def reap_expired_callbacks(self):
# TODO: Find and remove callbacks older than self.reap_threshold