TASK
Implementation
Implement Kafka-style consumer groups:
- Topic has multiple partitions
- Messages with same key go to same partition
- Consumer group: each partition assigned to one consumer
- Multiple groups each see all messages
- Rebalance partitions when consumers change
This enables parallel consumption while maintaining per-key ordering.
Sample Test Cases
Parallel consumptionTimeout: 5000ms
Input
{
"src": "c0",
"dest": "n1",
"body": {
"type": "init",
"msg_id": 1,
"node_id": "n1",
"node_ids": [
"n1"
]
}
}Expected Output
{"src":"n1","dest":"c0","body":{"type":"init_ok","in_reply_to":1,"msg_id":0}}Hints
Hint 1▾
Partition messages by key
Hint 2▾
Each partition assigned to one consumer per group
Hint 3▾
Rebalance on consumer join/leave
OVERVIEW
Theoretical Hub
Consumer Groups
Kafka pioneered consumer groups. Within a group, partitions are divided among consumers for parallelism. Different groups independently consume all messages (pub-sub pattern with scaling).
Partition Assignment
When consumers join/leave, partitions must be reassigned. Cooperative rebalancing minimizes disruption. Partition count limits max parallelism - plan accordingly.
Key Concepts
consumer groupspartitioningparallel processing
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
import sys
import json
import threading
from collections import defaultdict
class PartitionedQueue:
def __init__(self, num_partitions):
self.num_partitions = num_partitions
self.partitions = [[] for _ in range(num_partitions)]
self.offsets = defaultdict(dict) # group -> partition -> offset
self.assignments = {} # group -> {consumer -> [partitions]}
self.lock = threading.Lock()
def partition_for_key(self, key):
return hash(key) % self.num_partitions
def send(self, key, message):
# TODO: Append to appropriate partition
pass
def subscribe(self, group_id, consumer_id):
# TODO: Add consumer to group, rebalance
pass
def unsubscribe(self, group_id, consumer_id):
# TODO: Remove consumer, rebalance
pass
def poll(self, group_id, consumer_id, max_messages=10):
# TODO: Get messages from assigned partitions
pass
def commit(self, group_id, partition, offset):
# TODO: Record consumption progress
pass
if __name__ == "__main__":
pass