TASK
Implementation
Exactly-once processing means each event affects the output exactly once, even when the system retries failed operations. It combines three mechanisms: deduplication (skip events already seen), checkpointing (save state so recovery can resume), and transactional output (commit results atomically).
Without exactly-once:
process "hello" -> count=1
(crash, retry)
process "hello" -> count=2 <- WRONG, counted twice
With exactly-once (deduplication):
process "hello" (id=e1) -> count=1, mark e1 seen
(crash, retry)
process "hello" (id=e1) -> skip (e1 already seen) -> count=1 still correctYour node handles four message types:
// Process an event; skip if event_id was already seen
{ "type": "process", "msg_id": 1,
"event_id": "e1", "word": "hello" }
-> { "type": "processed", "in_reply_to": 1,
"word": "hello", "count": 1, "was_duplicate": false }
// Save current state as a named checkpoint
{ "type": "checkpoint", "msg_id": 2, "checkpoint_id": "cp1" }
-> { "type": "checkpoint_saved", "in_reply_to": 2, "checkpoint_id": "cp1" }
// Restore state from a checkpoint
{ "type": "restore", "msg_id": 3, "checkpoint_id": "cp1" }
-> { "type": "restored", "in_reply_to": 3,
"counts": {"hello": 1} }
// Commit pending outputs atomically
{ "type": "commit", "msg_id": 4 }
-> { "type": "committed", "in_reply_to": 4, "output_count": 1 }Sample Test Cases
Idempotent processingTimeout: 5000ms
Input
{"src":"stream","dest":"processor","body":{"type":"process","msg_id":1,"event_id":"e1","word":"hello"}}
{"src":"stream","dest":"processor","body":{"type":"process","msg_id":2,"event_id":"e1","word":"hello"}}
Expected Output
{"type": "processed", "in_reply_to": 1, "word": "hello", "count": 1, "was_duplicate": false}
{"type": "processed", "in_reply_to": 2, "word": "hello", "count": 1, "was_duplicate": true}Checkpoint and restoreTimeout: 5000ms
Input
{"src":"stream","dest":"processor","body":{"type":"process","msg_id":1,"event_id":"e1","word":"hello"}}
{"src":"client","dest":"processor","body":{"type":"checkpoint","msg_id":2,"checkpoint_id":"cp1"}}
{"src":"client","dest":"processor","body":{"type":"restore","msg_id":3,"checkpoint_id":"cp1"}}
Expected Output
{"type": "processed", "in_reply_to": 1, "word": "hello", "count": 1, "was_duplicate": false}
{"type": "checkpoint_saved", "in_reply_to": 2, "checkpoint_id": "cp1"}
{"type": "restored", "in_reply_to": 3, "counts": {"hello": 1}}Hints
Hint 1▾
Track processed event IDs in a set; skip duplicates silently
Hint 2▾
Checkpoint saves the current count state so recovery can resume from it
Hint 3▾
restore loads the checkpoint and replaces current state
Hint 4▾
commit moves pending outputs to committed atomically; rollback discards them
Hint 5▾
At-least-once + idempotency = effectively exactly-once
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
exactly-onceidempotencydeduplicationcheckpointingtransactional commits
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()