TASK
Implementation
Event replay rebuilds an aggregate's state by applying all of its stored events in order. It is the core read mechanism in event sourcing: state is never stored directly, only derived by replaying history.
Implement a node that supports three replay modes:
// Full replay: apply all events to get current state
{ "type": "replay", "msg_id": 1, "aggregate_id": "user-123" }
-> { "type": "replayed", "in_reply_to": 1,
"aggregate_id": "user-123",
"events_replayed": 5, "state": {"name": "Jane"} }
// Snapshot-based replay: load snapshot, apply only events after its version
{ "type": "replay", "msg_id": 2,
"aggregate_id": "user-123", "use_snapshot": true }
-> { "type": "replayed", "in_reply_to": 2,
"aggregate_id": "user-123",
"snapshot_version": 100, "events_replayed": 2 }
// Temporal query: replay only events up to the given timestamp
{ "type": "get_state", "msg_id": 3,
"aggregate_id": "user-123", "timestamp": "2024-01-15T09:00:00Z" }
-> { "type": "state", "in_reply_to": 3,
"aggregate_id": "user-123",
"timestamp": "2024-01-15T09:00:00Z",
"state": {"name": "John Doe"} }For snapshot-based replay, events_replayed counts only events applied after the snapshot version. Temporal queries ignore any event recorded after the requested timestamp.
Sample Test Cases
Replay events to rebuild stateTimeout: 5000ms
Input
{
"src": "replayer",
"dest": "eventstore",
"body": {
"type": "replay",
"msg_id": 1,
"aggregate_id": "user-123"
}
}Expected Output
{"type": "replayed", "in_reply_to": 1, "aggregate_id": "user-123", "events_replayed": 5, "state": {"name": "Jane"}}Replay from snapshotTimeout: 5000ms
Input
{
"src": "replayer",
"dest": "eventstore",
"body": {
"type": "replay",
"msg_id": 1,
"aggregate_id": "user-123",
"use_snapshot": true
}
}Expected Output
{"type": "replayed", "in_reply_to": 1, "aggregate_id": "user-123", "snapshot_version": 100, "events_replayed": 2}Hints
Hint 1▾
Replay applies each event in sequence order to rebuild state from scratch
Hint 2▾
With a snapshot, start from the saved state and only replay events after that version
Hint 3▾
A temporal query replays only events with timestamp <= the target time
Hint 4▾
Merge each event_data dict into the running state dict on every replay step
Hint 5▾
snapshot_version in the response is the version at which the snapshot was taken
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
event replaystate reconstructionsnapshot-based replaytemporal queryfold over events
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()