TASK
Implementation
Consumer group rebalancing ensures that partitions are evenly distributed among consumers. When the group membership changes (a consumer joins, leaves, or crashes), the partitions must be reassigned.
The rebalancing protocol:
- Trigger: a consumer joins the group, leaves the group, or is removed (heartbeat timeout)
- Stop: all consumers in the group stop reading (consumption is paused)
- Elect leader: the group coordinator (a broker) elects one consumer as the group leader
- Assign: the leader runs the assignment strategy and assigns partitions to consumers
- Resume: all consumers receive their new assignments and resume reading
Range assignment strategy (the simplest):
- Sort the partition IDs and consumer IDs
- Divide partitions into contiguous ranges
- Example: 6 partitions, 3 consumers -> c1: [0,1], c2: [2,3], c3: [4,5]
- With uneven division: 7 partitions, 3 consumers -> c1: [0,1,2], c2: [3,4], c3: [5,6]
Request: {"type": "consumer_rebalance", "msg_id": 1, "group": "analytics", "consumers": ["c1", "c2", "c3"], "partitions": [0, 1, 2, 3, 4, 5], "strategy": "range"}
Response: {"type": "consumer_rebalance_ok", "in_reply_to": 1, "assignments": {"c1": [0, 1], "c2": [2, 3], "c3": [4, 5]}}
Request: {"type": "consumer_rebalance", "msg_id": 2, "group": "analytics", "consumers": ["c1", "c2"], "partitions": [0, 1, 2, 3, 4, 5], "strategy": "range"}
Response: {"type": "consumer_rebalance_ok", "in_reply_to": 2, "assignments": {"c1": [0, 1, 2], "c2": [3, 4, 5]}}Sample Test Cases
Even distribution: 6 partitions, 3 consumersTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"consumer_rebalance","msg_id":2,"group":"g1","consumers":["c1","c2","c3"],"partitions":[0,1,2,3,4,5],"strategy":"range"}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "c1", "body": {"type": "consumer_rebalance_ok", "in_reply_to": 2, "assignments": {"c1": [0, 1], "c2": [2, 3], "c3": [4, 5]}, "msg_id": 1}}
Redistribution after consumer leaves: 6 partitions, 2 consumersTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"consumer_rebalance","msg_id":2,"group":"g1","consumers":["c1","c2"],"partitions":[0,1,2,3,4,5],"strategy":"range"}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "c1", "body": {"type": "consumer_rebalance_ok", "in_reply_to": 2, "assignments": {"c1": [0, 1, 2], "c2": [3, 4, 5]}, "msg_id": 1}}
Hints
Hint 1▾
When a consumer joins or leaves a group, all partition assignments must be recalculated
Hint 2▾
Range strategy: sort consumers and partitions, divide partitions into contiguous ranges per consumer
Hint 3▾
During rebalancing, all consumers in the group pause consumption briefly (stop-the-world)
Hint 4▾
The group coordinator (a broker) manages the rebalancing protocol
Hint 5▾
Uneven distribution: if 6 partitions / 4 consumers, some consumers get 2 and some get 1
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
consumer grouprebalancingpartition assignmentrange strategygroup coordinator
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()