TASK
Implementation
The query side is the read path in CQRS. Instead of querying the write model directly (which is normalized for writes), it reads from pre-built read models (projections) that are denormalized and indexed for the specific query being served. Query results can also be cached to avoid redundant work.
Implement a node that serves three query types:
// Paginated user list from a denormalized listing projection
{ "type": "GetUserListing", "msg_id": 1,
"params": {"page": 1, "limit": 10} }
-> { "type": "query_result", "in_reply_to": 1,
"data": [{"id": "user-123", "name": "John Doe"}],
"cached": false }
// Email lookup via index (second call returns from cache)
{ "type": "GetUserByEmail", "msg_id": 2,
"params": {"email": "john@example.com"} }
-> { "type": "query_result", "in_reply_to": 2,
"data": {"email": "john@example.com", "userId": "user-123"},
"cached": true }
// Filtered query: users in a specific city
{ "type": "GetUsersByCity", "msg_id": 3,
"params": {"city": "NYC"} }
-> { "type": "query_result", "in_reply_to": 3,
"data": [{"city": "NYC", "userId": "user-123"}],
"cached": false }The cached field indicates whether the result was served from the query cache. A query handler never modifies any state.
Sample Test Cases
Query user listingTimeout: 5000ms
Input
{
"src": "client",
"dest": "queryside",
"body": {
"type": "GetUserListing",
"msg_id": 1,
"params": {
"page": 1,
"limit": 10
}
}
}Expected Output
{"type": "query_result", "in_reply_to": 1, "data": [{"id": "user-123", "name": "John Doe"}], "cached": false}Query with cache hitTimeout: 5000ms
Input
{
"src": "client",
"dest": "queryside",
"body": {
"type": "GetUserByEmail",
"msg_id": 1,
"params": {
"email": "john@example.com"
}
}
}Expected Output
{"type": "query_result", "in_reply_to": 1, "data": {"email": "john@example.com", "userId": "user-123"}, "cached": true}Hints
Hint 1▾
Read models are denormalized: pre-join and pre-aggregate data for fast reads
Hint 2▾
Cache query results with a TTL; return cached=true when the result comes from cache
Hint 3▾
GetUserListing uses pagination (page, limit) to return a slice of the users list
Hint 4▾
GetUserByEmail uses an email index for O(1) lookup rather than a full scan
Hint 5▾
The query side never writes — it reads from projections built by the event side
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
read modelquery optimizationcachingdenormalizationpagination
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()