TASK
Implementation
Consumer offset tracking enables consumers to read at their own pace and resume after restarts. Each consumer group independently tracks its position in each partition.
The offset lifecycle:
- Consumer starts: calls
fetch_offsetto find where it last left off - Consumer reads: fetches messages starting from its offset
- Consumer processes: performs application logic on the messages
- Consumer commits: calls
commit_offsetto save its new position - Consumer crashes: on restart, calls
fetch_offsetagain and resumes from the last committed offset
This gives at-least-once delivery: if a consumer crashes after processing but before committing, it will re-process those messages on restart. For exactly-once, additional mechanisms are needed.
Multiple consumer groups can read the same partition independently at different speeds — a key Kafka design principle.
Request: {"type": "commit_offset", "msg_id": 1, "group": "analytics", "topic": "orders", "partition": 0, "offset": 42}
Response: {"type": "commit_offset_ok", "in_reply_to": 1}
Request: {"type": "fetch_offset", "msg_id": 2, "group": "analytics", "topic": "orders", "partition": 0}
Response: {"type": "fetch_offset_ok", "in_reply_to": 2, "offset": 42}
Request: {"type": "fetch_offset", "msg_id": 3, "group": "billing", "topic": "orders", "partition": 0}
Response: {"type": "fetch_offset_ok", "in_reply_to": 3, "offset": 0}Sample Test Cases
Commit and fetch offset roundtripTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"commit_offset","msg_id":2,"group":"g1","topic":"t1","partition":0,"offset":10}}
{"src":"c1","dest":"n1","body":{"type":"fetch_offset","msg_id":3,"group":"g1","topic":"t1","partition":0}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "c1", "body": {"type": "commit_offset_ok", "in_reply_to": 2, "msg_id": 1}}
{"src": "n1", "dest": "c1", "body": {"type": "fetch_offset_ok", "in_reply_to": 3, "offset": 10, "msg_id": 2}}
Different groups track independent offsetsTimeout: 5000ms
Input
{"src":"c0","dest":"n1","body":{"type":"init","msg_id":1,"node_id":"n1","node_ids":["n1"]}}
{"src":"c1","dest":"n1","body":{"type":"commit_offset","msg_id":2,"group":"fast","topic":"t1","partition":0,"offset":100}}
{"src":"c1","dest":"n1","body":{"type":"commit_offset","msg_id":3,"group":"slow","topic":"t1","partition":0,"offset":5}}
{"src":"c1","dest":"n1","body":{"type":"fetch_offset","msg_id":4,"group":"fast","topic":"t1","partition":0}}
{"src":"c1","dest":"n1","body":{"type":"fetch_offset","msg_id":5,"group":"slow","topic":"t1","partition":0}}
Expected Output
{"src": "n1", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
{"src": "n1", "dest": "c1", "body": {"type": "commit_offset_ok", "in_reply_to": 2, "msg_id": 1}}
{"src": "n1", "dest": "c1", "body": {"type": "commit_offset_ok", "in_reply_to": 3, "msg_id": 2}}
{"src": "n1", "dest": "c1", "body": {"type": "fetch_offset_ok", "in_reply_to": 4, "offset": 100, "msg_id": 3}}
{"src": "n1", "dest": "c1", "body": {"type": "fetch_offset_ok", "in_reply_to": 5, "offset": 5, "msg_id": 4}}
Hints
Hint 1▾
Each consumer group maintains an independent offset per partition (their "bookmark")
Hint 2▾
commit_offset: client saves its current position after processing messages
Hint 3▾
fetch_offset: retrieve where the consumer last left off (for resuming after restart)
Hint 4▾
If a consumer crashes before committing, it re-reads from the last committed offset (at-least-once)
Hint 5▾
In real Kafka, offsets are stored in a special internal topic (__consumer_offsets)
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
consumer offsetconsumer groupcommit offsetfetch offsetat-least-once delivery
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()