ARCHIVED from builddistributedsystem.com on 2026-04-28 — URL: https://builddistributedsystem.com/tracks/reactor/tasks/task-27-2-5-cqrs-event-sourcing
TASK

Implementation

CQRS and event sourcing are designed to work together: commands write to the event store (the source of truth), and projections built from those events serve queries. This gives you the full audit trail of event sourcing plus the read performance of CQRS.

Implement a node that integrates both patterns end-to-end:

// Command: validate, apply to aggregate, persist event, update projections
{ "type": "command", "msg_id": 1,
  "command": {"type": "CreateUser",
               "payload": {"id": "user-123", "name": "John"}} }
-> { "type": "command_result", "in_reply_to": 1,
    "success": true,
    "events": [{"eventType": "UserCreated", "aggregateId": "user-123"}] }

// Query: read from projection (never from event store)
{ "type": "query", "msg_id": 2,
  "query": {"type": "GetUser", "params": {"userId": "user-123"}} }
-> { "type": "query_result", "in_reply_to": 2,
    "data": {"id": "user-123", "name": "John"} }

// Temporal query: replay event store up to a point in time
{ "type": "query", "msg_id": 3,
  "query": {"type": "GetUserAtTime",
             "params": {"userId": "user-123",
                        "timestamp": "2024-01-15T09:00:00Z"}} }
-> { "type": "query_result", "in_reply_to": 3,
    "data": {"id": "user-123", "name": "John Doe",
             "at_time": "2024-01-15T09:00:00Z"} }

The flow for a command is: validate -> apply to aggregate -> append event to store -> update projections. The flow for a query is: read from projection (for current state) or replay event store (for past state).

Sample Test Cases

Execute command with event sourcingTimeout: 5000ms
Input
{
  "src": "client",
  "dest": "cqrs",
  "body": {
    "type": "command",
    "msg_id": 1,
    "command": {
      "type": "CreateUser",
      "payload": {
        "id": "user-123",
        "name": "John"
      }
    }
  }
}
Expected Output
{"type": "command_result", "in_reply_to": 1, "success": true, "events": [{"eventType": "UserCreated", "aggregateId": "user-123"}]}
Query from projectionTimeout: 5000ms
Input
{
  "src": "client",
  "dest": "cqrs",
  "body": {
    "type": "query",
    "msg_id": 1,
    "query": {
      "type": "GetUser",
      "params": {
        "userId": "user-123"
      }
    }
  }
}
Expected Output
{"type": "query_result", "in_reply_to": 1, "data": {"id": "user-123", "name": "John"}}

Hints

Hint 1
Commands go through validation, then are applied to the aggregate, which emits events
Hint 2
Events are stored in the event store and then applied to projections
Hint 3
Queries read from projections, not from the event store directly
Hint 4
Temporal queries replay the event store up to a given timestamp
Hint 5
The aggregate version must match the expected version on every command (optimistic locking)
OVERVIEW

Theoretical Hub

Concept overview coming soon

Key Concepts

CQRSevent sourcingaggregateprojectiontemporal queryfull-stack integration
main.py
python
Implement CQRS with Event Sourcing - The Reactor | Build Distributed Systems