TASK
Implementation
In CQRS, the command side emits events and the query side must react to those events to keep its read models up-to-date. An event-driven projector subscribes to the event bus and updates one or more read models whenever a relevant event arrives.
Implement a node that acts as a projector with subscription and idempotent update support:
// Register interest in specific event types
{ "type": "subscribe", "msg_id": 1,
"event_types": ["UserCreated", "UserUpdated"] }
-> { "type": "subscribed", "in_reply_to": 1,
"event_types": ["UserCreated", "UserUpdated"] }
// Event arrives: update all relevant read models
{ "type": "event", "msg_id": 2,
"event": {"type": "UserCreated",
"payload": {"id": "user-123", "name": "John"}} }
-> { "type": "read_models_updated", "in_reply_to": 2,
"event_id": "evt-123",
"updated_models": ["user_listing", "user_by_email"] }
// Same event_id arrives again: skip it
{ "type": "event", "msg_id": 3,
"event": {"type": "UserCreated", "id": "evt-123",
"payload": {"id": "user-123"}} }
-> { "type": "event_skipped", "in_reply_to": 3,
"event_id": "evt-123", "reason": "already_processed" }Idempotency is critical: the event bus may deliver the same event more than once. Always check the event_id before applying any update.
Sample Test Cases
Subscribe to eventsTimeout: 5000ms
Input
{
"src": "projector",
"dest": "eventbus",
"body": {
"type": "subscribe",
"msg_id": 1,
"event_types": [
"UserCreated",
"UserUpdated"
]
}
}Expected Output
{"type": "subscribed", "in_reply_to": 1, "event_types": ["UserCreated", "UserUpdated"]}Update read models on eventTimeout: 5000ms
Input
{
"src": "eventbus",
"dest": "projector",
"body": {
"type": "event",
"msg_id": 1,
"event": {
"type": "UserCreated",
"payload": {
"id": "user-123",
"name": "John"
}
}
}
}Expected Output
{"type": "read_models_updated", "in_reply_to": 1, "event_id": "evt-123", "updated_models": ["user_listing", "user_by_email"]}Hints
Hint 1▾
subscribe registers the projector to receive the listed event types from the event bus
Hint 2▾
When an event arrives, update every read model that cares about that event type
Hint 3▾
Track processed event IDs; return event_skipped for duplicates (idempotency)
Hint 4▾
updated_models lists the names of all read models that were actually updated
Hint 5▾
Eventual consistency: read models may lag slightly behind the write model
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
event busprojectorsubscriptionidempotent updateseventual consistency
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()