TASK
Implementation
Batch MapReduce waits for all data before producing output. Stream processing handles an infinite flow of events: state is updated as each event arrives, and results can be queried at any time.
Your node maintains a running word count across all received messages:
// Process a batch of words — update running counts
{ "type": "process", "msg_id": 1, "words": ["hello", "world", "hello"] }
→ { "type": "processed", "in_reply_to": 1, "counts": {"hello": 2, "world": 1} }
// Return the top N words by count
{ "type": "topn", "msg_id": 2, "n": 2,
"counts": {"hello": 5, "world": 3, "stream": 1} }
→ { "type": "topn", "in_reply_to": 2,
"top_words": [["hello", 5], ["world", 3]] }
// Increment a single word and return its new count
{ "type": "update", "msg_id": 3, "word": "hello", "current_count": 5 }
→ { "type": "updated", "in_reply_to": 3, "word": "hello", "new_count": 6 }
// Emit top N from current in-memory state (periodic output)
{ "type": "output", "msg_id": 4, "interval_ms": 1000, "counts": {"hello": 10} }
→ { "type": "periodic_output", "in_reply_to": 4,
"top_words": [["hello", 10]] }Unlike batch processing, the node never resets counts between messages — every process call adds to the global running totals.
Sample Test Cases
Process word streamTimeout: 5000ms
Input
{
"src": "stream",
"dest": "processor",
"body": {
"type": "process",
"msg_id": 1,
"words": [
"hello",
"world",
"hello"
]
}
}Expected Output
{"type": "processed", "in_reply_to": 1, "counts": {"hello": 2, "world": 1}}Output top N wordsTimeout: 5000ms
Input
{
"src": "stream",
"dest": "processor",
"body": {
"type": "topn",
"msg_id": 1,
"n": 2,
"counts": {
"hello": 5,
"world": 3,
"stream": 1
}
}
}Expected Output
{"type": "topn", "in_reply_to": 1, "top_words": [["hello", 5], ["world", 3]]}Hints
Hint 1▾
Keep a running word count dict in memory, update on every process message
Hint 2▾
topn: sort the dict by count descending, return the first N entries
Hint 3▾
update increments a single word by 1 and returns the new count
Hint 4▾
output emits the top N words from current state without resetting it
Hint 5▾
Lowercase and strip words before counting
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
stream processingstateful processingrunning aggregatestop-Nincremental updates
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()