ARCHIVED from builddistributedsystem.com on 2026-04-28 — URL: https://builddistributedsystem.com/tracks/messenger/tasks/task-1-2-4-callback-reaper
TASK

Implementation

When a node sends an async RPC but the recipient crashes or the network drops the message, the callback stays in memory forever. This is a resource leak that can eventually consume all available memory.

Your task is to implement a callback reaper that:

  1. Records the timestamp when each callback is registered
  2. Periodically scans for callbacks older than a threshold (default: 2 seconds)
  3. Removes expired callbacks and invokes them with a timeout error
  4. Reports how many callbacks were reaped

Implement a pending_count message type that returns the number of currently pending callbacks:

Request:  {"type": "pending_count", "msg_id": 1}
Response: {"type": "pending_count_ok", "in_reply_to": 1, "count": 5}

Also implement a send_fire_forget message type that sends an RPC without expecting a reply (to simulate leaked callbacks):

Request:  {"type": "send_fire_forget", "msg_id": 1, "target": "n2", "payload": {"type": "echo", "echo": "lost"}}
Response: {"type": "send_fire_forget_ok", "in_reply_to": 1, "pending": 1}

Sample Test Cases

Pending count starts at zeroTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"pending_count","msg_id":2}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "c1", "body": {"type": "pending_count_ok", "count": 0, "in_reply_to": 2, "msg_id": 1}}
Fire-and-forget increases pending countTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1","n2"]}}
{"src":"c1","dest":"n1","body":{"type":"send_fire_forget","msg_id":2,"target":"n2","payload":{"type":"echo","echo":"lost"}}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "n2", "body": {"type": "echo", "echo": "lost", "msg_id": 1}}
{"src": "n1", "dest": "c1", "body": {"type": "send_fire_forget_ok", "pending": 1, "in_reply_to": 2, "msg_id": 2}}

Hints

Hint 1
Store the timestamp when each callback is registered
Hint 2
Periodically scan the callbacks dictionary for expired entries
Hint 3
Use time.time() to get the current timestamp in seconds
Hint 4
A reaper interval of 500ms is a good starting point
Hint 5
When reaping, invoke the callback with an error or None to signal timeout
OVERVIEW

Theoretical Hub

Concept overview coming soon

Key Concepts

resource cleanupmemory leaksperiodic tasksgarbage collection
main.py
python
Implement Callback Reaper for Leaked RPCs - The Messenger | Build Distributed Systems