ARCHIVED from builddistributedsystem.com on 2026-04-28 — URL: https://builddistributedsystem.com/tracks/reactor/tasks/task-27-1-1-event-store
TASK

Implementation

An event store is an append-only log where every change to an aggregate is recorded as an immutable event. To reconstruct current state you replay all events in order — or start from a snapshot. Overwriting state is never allowed.

Implement a node that manages events for multiple aggregates with optimistic concurrency control: the caller declares which version they expect; if another writer already changed the aggregate, the append is rejected.

// Append an event (version must match current aggregate version)
{ "type": "append", "msg_id": 1,
  "aggregate_id": "user-123", "event_type": "UserCreated",
  "event_data": {"name": "John"}, "version": 0 }
-> { "type": "appended", "in_reply_to": 1,
    "event_id": "<uuid>", "sequence_number": 1 }

// Version mismatch -> reject
{ "type": "append", "version": 5 }   // actual version is 0
-> { "type": "concurrency_error", "in_reply_to": 1,
    "error": "Expected version 5, got 0" }

// Save a snapshot at the aggregate's current version
{ "type": "create_snapshot", "msg_id": 3, "aggregate_id": "user-123" }
-> { "type": "snapshot_created", "in_reply_to": 3,
    "aggregate_id": "user-123", "version": 1 }

Each aggregate starts at version 0. Every successful append increments its version by 1 and advances the global sequence number.

Sample Test Cases

Append event to storeTimeout: 5000ms
Input
{
  "src": "service",
  "dest": "eventstore",
  "body": {
    "type": "append",
    "msg_id": 1,
    "aggregate_id": "user-123",
    "event_type": "UserCreated",
    "event_data": {
      "name": "John"
    },
    "version": 0
  }
}
Expected Output
{"type": "appended", "in_reply_to": 1, "event_id": ".*", "sequence_number": 1}
Detect concurrent modificationTimeout: 5000ms
Input
{
  "src": "service",
  "dest": "eventstore",
  "body": {
    "type": "append",
    "msg_id": 1,
    "aggregate_id": "user-123",
    "event_type": "UserUpdated",
    "event_data": {
      "name": "Jane"
    },
    "version": 5
  }
}
Expected Output
{"type": "concurrency_error", "in_reply_to": 1, "error": "Expected version 5, got 0"}

Hints

Hint 1
Events are immutable — never update or delete, only append
Hint 2
Track a version per aggregate; reject appends where the sent version does not match current
Hint 3
Sequence numbers are global and monotonically increasing across all aggregates
Hint 4
A snapshot stores the aggregate state at a specific version to speed up replay
Hint 5
Return a generated event_id and the new sequence_number on success
OVERVIEW

Theoretical Hub

Concept overview coming soon

Key Concepts

event sourcingappend-only logoptimistic concurrencyaggregatesequence numbersnapshot
main.py
python
Implement Event Store - The Reactor | Build Distributed Systems