TASK
Implementation
The In-Sync Replica (ISR) set is Kafka's mechanism for balancing durability and availability. It tracks which replicas are "caught up" with the leader and determines the durability guarantee for writes.
ISR behavior:
- Write with acks=all: leader replicates the message to ALL ISR members, then acknowledges the producer. This guarantees the message survives any single broker failure.
- Follower falls behind: if a follower's replication lag exceeds
replica.lag.time.max.ms(default 10s), the leader removes it from the ISR. - ISR shrinks: with fewer ISR members, writes are acknowledged with fewer replicas. Durability is reduced but availability is maintained.
- Follower catches up: when the slow follower catches up to the leader's log end offset, it is added back to the ISR.
- Min ISR:
min.insync.replicas(e.g. 2) prevents writes when ISR drops below a threshold, trading availability for durability.
Request: {"type": "isr_status", "msg_id": 1, "topic": "orders", "partition": 0}
Response: {"type": "isr_status_ok", "in_reply_to": 1, "leader": "n1", "isr": ["n1", "n2", "n3"], "out_of_sync": []}
Request: {"type": "isr_simulate_lag", "msg_id": 2, "node": "n3", "lag_seconds": 15}
Response: {"type": "isr_simulate_lag_ok", "in_reply_to": 2, "removed_from_isr": true, "new_isr": ["n1", "n2"], "reason": "lag_15s_exceeds_threshold_10s"}
Request: {"type": "isr_recover", "msg_id": 3, "node": "n3"}
Response: {"type": "isr_recover_ok", "in_reply_to": 3, "added_to_isr": true, "new_isr": ["n1", "n2", "n3"]}Sample Test Cases
All replicas initially in syncTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1","n2","n3"]}}
{"src":"c1","dest":"n1","body":{"type":"isr_status","msg_id":2,"topic":"orders","partition":0}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
Lagging node removed from ISRTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1","n2","n3"]}}
{"src":"c1","dest":"n1","body":{"type":"isr_simulate_lag","msg_id":2,"node":"n3","lag_seconds":15}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
Hints
Hint 1▾
The ISR is the set of replicas that are "caught up" with the leader (within a lag threshold)
Hint 2▾
With acks=all, the leader only acknowledges a write after ALL ISR members have replicated it
Hint 3▾
Remove a follower from ISR if it falls more than 10 seconds behind (replica.lag.time.max.ms)
Hint 4▾
When a slow follower catches up, add it back to the ISR
Hint 5▾
acks=all means "all ISR members", NOT "all replicas" — shrinking ISR reduces the durability guarantee
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
ISRin-sync replicasreplication lagacks=alldurability guarantee
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()