TASK
Implementation
CQRS and event sourcing are designed to work together: commands write to the event store (the source of truth), and projections built from those events serve queries. This gives you the full audit trail of event sourcing plus the read performance of CQRS.
Implement a node that integrates both patterns end-to-end:
// Command: validate, apply to aggregate, persist event, update projections
{ "type": "command", "msg_id": 1,
"command": {"type": "CreateUser",
"payload": {"id": "user-123", "name": "John"}} }
-> { "type": "command_result", "in_reply_to": 1,
"success": true,
"events": [{"eventType": "UserCreated", "aggregateId": "user-123"}] }
// Query: read from projection (never from event store)
{ "type": "query", "msg_id": 2,
"query": {"type": "GetUser", "params": {"userId": "user-123"}} }
-> { "type": "query_result", "in_reply_to": 2,
"data": {"id": "user-123", "name": "John"} }
// Temporal query: replay event store up to a point in time
{ "type": "query", "msg_id": 3,
"query": {"type": "GetUserAtTime",
"params": {"userId": "user-123",
"timestamp": "2024-01-15T09:00:00Z"}} }
-> { "type": "query_result", "in_reply_to": 3,
"data": {"id": "user-123", "name": "John Doe",
"at_time": "2024-01-15T09:00:00Z"} }The flow for a command is: validate -> apply to aggregate -> append event to store -> update projections. The flow for a query is: read from projection (for current state) or replay event store (for past state).
Sample Test Cases
Execute command with event sourcingTimeout: 5000ms
Input
{
"src": "client",
"dest": "cqrs",
"body": {
"type": "command",
"msg_id": 1,
"command": {
"type": "CreateUser",
"payload": {
"id": "user-123",
"name": "John"
}
}
}
}Expected Output
{"type": "command_result", "in_reply_to": 1, "success": true, "events": [{"eventType": "UserCreated", "aggregateId": "user-123"}]}Query from projectionTimeout: 5000ms
Input
{
"src": "client",
"dest": "cqrs",
"body": {
"type": "query",
"msg_id": 1,
"query": {
"type": "GetUser",
"params": {
"userId": "user-123"
}
}
}
}Expected Output
{"type": "query_result", "in_reply_to": 1, "data": {"id": "user-123", "name": "John"}}Hints
Hint 1▾
Commands go through validation, then are applied to the aggregate, which emits events
Hint 2▾
Events are stored in the event store and then applied to projections
Hint 3▾
Queries read from projections, not from the event store directly
Hint 4▾
Temporal queries replay the event store up to a given timestamp
Hint 5▾
The aggregate version must match the expected version on every command (optimistic locking)
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
CQRSevent sourcingaggregateprojectiontemporal queryfull-stack integration
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()