TASK
Implementation
Achieve exactly-once processing semantics:
Producer side:
- Assign unique ID to each message
- Queue deduplicates by ID
Consumer side:
- Track processed message IDs
- Skip messages already processed
- Atomically: process + commit offset + record as processed
This requires cooperation between producer, queue, and consumer.
Sample Test Cases
Producer deduplicationTimeout: 5000ms
Input
{
"src": "c0",
"dest": "n1",
"body": {
"type": "init",
"msg_id": 1,
"node_id": "n1",
"node_ids": [
"n1"
]
}
}Expected Output
{"src":"n1","dest":"c0","body":{"type":"init_ok","in_reply_to":1,"msg_id":0}}Consumer skip duplicateTimeout: 5000ms
Input
{
"src": "c0",
"dest": "n1",
"body": {
"type": "init",
"msg_id": 1,
"node_id": "n1",
"node_ids": [
"n1"
]
}
}Expected Output
{"src":"n1","dest":"c0","body":{"type":"init_ok","in_reply_to":1,"msg_id":0}}Hints
Hint 1▾
Dedup on producer side with message ID
Hint 2▾
Track processed IDs on consumer side
Hint 3▾
Use transactions for consume-produce
OVERVIEW
Theoretical Hub
Exactly-Once Semantics
True exactly-once is end-to-end: exactly-once production + exactly-once consumption + idempotent processing. Kafka achieves this through idempotent producers, transactional consumers, and offset commits within transactions.
Idempotency Keys
Using unique message IDs, producers retry safely (queue rejects duplicates) and consumers skip already-processed messages. The challenge is tracking and garbage-collecting these IDs efficiently.
Key Concepts
exactly-onceidempotencydeduplication
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python3
import sys
import json
import threading
class ExactlyOnceQueue:
def __init__(self, dedup_window=10000):
self.messages = []
self.seen_ids = set()
self.dedup_window = dedup_window
self.lock = threading.Lock()
def send(self, message_id, message):
# TODO: Deduplicate by message ID
pass
class ExactlyOnceConsumer:
def __init__(self, queue, processed_store):
self.queue = queue
self.processed = processed_store # Persistent set
self.offset = 0
def poll_and_process(self, handler):
# TODO: Get message, check if processed
# TODO: Process, then atomically commit
pass
def _is_processed(self, message_id):
return message_id in self.processed
def _mark_processed(self, message_id, offset):
self.processed.add(message_id)
self.offset = offset
if __name__ == "__main__":
pass