TASK
Implementation
A single-broker job queue is both a bottleneck and a single point of failure. A distributed queue partitions jobs across multiple brokers and replicates each partition so no broker failure loses any jobs.
Implement a node that manages a partitioned, replicated job queue:
// Initialize: 3 partitions, replicated to 2 brokers each
{ "type": "init", "msg_id": 1,
"partitions": 3, "replication_factor": 2 }
-> { "type": "init_ok", "in_reply_to": 1 }
// Push jobs — assigned to partitions by hash(job_id) % 3
{ "type": "push_job", "msg_id": 2,
"jobs": [{"id":"j1"},{"id":"j2"},{"id":"j3"},
{"id":"j4"},{"id":"j5"},{"id":"j6"}] }
// Worker pops next job from its assigned partition
{ "type": "pop_job", "msg_id": 3,
"consumer_id": "w1", "partitions": ["p1","p2","p3"] }
-> { "type": "job_assigned", "in_reply_to": 3,
"job": {}, "partition": "p1" }
// Add new brokers -> rebalance partitions
{ "type": "rebalance_partitions", "msg_id": 4,
"new_brokers": ["broker4","broker5"],
"target_partitions_per_broker": 1 }
-> { "type": "rebalance_complete", "in_reply_to": 4,
"migrations": [{"partition":"p2","from":"broker1","to":"broker4"}] }Sample Test Cases
Partition job distributionTimeout: 5000ms
Input
{"src":"producer","dest":"queue","body":{"type":"init","msg_id":1,"partitions":3,"replication_factor":2}}
{"src":"producer","dest":"queue","body":{"type":"push_job","msg_id":2,"jobs":[{"id":"j1"},{"id":"j2"},{"id":"j3"},{"id":"j4"},{"id":"j5"},{"id":"j6"}]}}
Expected Output
{"src": "queue", "dest": "producer", "body": {"type": "init_ok", "in_reply_to": 1}}
Consumer pulls from partitionsTimeout: 5000ms
Input
{
"src": "worker",
"dest": "queue",
"body": {
"type": "pop_job",
"msg_id": 1,
"consumer_id": "w1",
"partitions": [
"p1",
"p2",
"p3"
]
}
}Expected Output
{"src": "queue", "dest": "worker", "body": {"type": "job_assigned", "in_reply_to": 1, "job": {}, "partition": "p1"}}Hints
Hint 1▾
Assign jobs to partitions using hash(job_id) % num_partitions
Hint 2▾
A consumer owns a partition exclusively; only one consumer pops from a partition at a time
Hint 3▾
Rebalancing moves partitions from over-loaded brokers to under-loaded ones
Hint 4▾
On primary broker failure, a replica promotes to primary and resumes serving without job loss
Hint 5▾
Replication factor=2 means every partition has one primary and one replica
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
partitioned queuereplicationconsumer assignmentpartition rebalancingbroker failover
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()