TASK
Implementation
An event store is an append-only log where every change to an aggregate is recorded as an immutable event. To reconstruct current state you replay all events in order — or start from a snapshot. Overwriting state is never allowed.
Implement a node that manages events for multiple aggregates with optimistic concurrency control: the caller declares which version they expect; if another writer already changed the aggregate, the append is rejected.
// Append an event (version must match current aggregate version)
{ "type": "append", "msg_id": 1,
"aggregate_id": "user-123", "event_type": "UserCreated",
"event_data": {"name": "John"}, "version": 0 }
-> { "type": "appended", "in_reply_to": 1,
"event_id": "<uuid>", "sequence_number": 1 }
// Version mismatch -> reject
{ "type": "append", "version": 5 } // actual version is 0
-> { "type": "concurrency_error", "in_reply_to": 1,
"error": "Expected version 5, got 0" }
// Save a snapshot at the aggregate's current version
{ "type": "create_snapshot", "msg_id": 3, "aggregate_id": "user-123" }
-> { "type": "snapshot_created", "in_reply_to": 3,
"aggregate_id": "user-123", "version": 1 }Each aggregate starts at version 0. Every successful append increments its version by 1 and advances the global sequence number.
Sample Test Cases
Append event to storeTimeout: 5000ms
Input
{
"src": "service",
"dest": "eventstore",
"body": {
"type": "append",
"msg_id": 1,
"aggregate_id": "user-123",
"event_type": "UserCreated",
"event_data": {
"name": "John"
},
"version": 0
}
}Expected Output
{"type": "appended", "in_reply_to": 1, "event_id": ".*", "sequence_number": 1}Detect concurrent modificationTimeout: 5000ms
Input
{
"src": "service",
"dest": "eventstore",
"body": {
"type": "append",
"msg_id": 1,
"aggregate_id": "user-123",
"event_type": "UserUpdated",
"event_data": {
"name": "Jane"
},
"version": 5
}
}Expected Output
{"type": "concurrency_error", "in_reply_to": 1, "error": "Expected version 5, got 0"}Hints
Hint 1▾
Events are immutable — never update or delete, only append
Hint 2▾
Track a version per aggregate; reject appends where the sent version does not match current
Hint 3▾
Sequence numbers are global and monotonically increasing across all aggregates
Hint 4▾
A snapshot stores the aggregate state at a specific version to speed up replay
Hint 5▾
Return a generated event_id and the new sequence_number on success
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
event sourcingappend-only logoptimistic concurrencyaggregatesequence numbersnapshot
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()