TASK
Implementation
Job monitoring gives operators visibility into what is running, how long it takes, and when things go wrong. Without it, a failed job can go undetected for hours.
Implement a node that tracks job lifecycle events and exposes aggregate statistics:
// Record a status update with progress percentage
{ "type": "update_status", "msg_id": 1,
"job_id": "job-123", "status": "running", "progress": 25 }
-> { "type": "status_updated", "in_reply_to": 1,
"job_id": "job-123", "status": "running", "progress": 25 }
// Record completion with timing and resource metrics
{ "type": "job_completed", "msg_id": 2,
"job_id": "job-123", "duration_ms": 60000,
"resource_usage": {"cpu_percent": 75, "memory_mb": 1024} }
-> { "type": "job_completed", "in_reply_to": 2,
"job_id": "job-123", "duration_ms": 60000,
"resource_usage": {"cpu_percent": 75, "memory_mb": 1024} }
// Job fails after max retries -> send an alert
{ "type": "job_failed", "msg_id": 3,
"job_id": "job-123", "error": "Connection timeout", "retries": 3 }
-> { "type": "alert_sent", "in_reply_to": 3,
"job_id": "job-123",
"alert": "Job failed after 3 retries: Connection timeout" }
// Aggregate statistics across all tracked jobs
{ "type": "get_stats", "msg_id": 4 }
-> { "type": "job_stats", "in_reply_to": 4,
"total": 100, "completed": 85, "failed": 5, "avg_duration_ms": 5000 }Sample Test Cases
Track job status updatesTimeout: 5000ms
Input
{
"src": "worker",
"dest": "monitor",
"body": {
"type": "update_status",
"msg_id": 1,
"job_id": "job-123",
"status": "running",
"progress": 25
}
}Expected Output
{"type": "status_updated", "in_reply_to": 1, "job_id": "job-123", "status": "running", "progress": 25}Record job completion with metricsTimeout: 5000ms
Input
{
"src": "worker",
"dest": "monitor",
"body": {
"type": "job_completed",
"msg_id": 1,
"job_id": "job-123",
"duration_ms": 60000,
"resource_usage": {
"cpu_percent": 75,
"memory_mb": 1024
}
}
}Expected Output
{"type": "job_completed", "in_reply_to": 1, "job_id": "job-123", "duration_ms": 60000, "resource_usage": {"cpu_percent": 75, "memory_mb": 1024}}Hints
Hint 1▾
update_status stores the current status and progress (0-100) for a job_id
Hint 2▾
job_completed records duration_ms and resource_usage alongside the job record
Hint 3▾
Fire an alert only when a job fails after exhausting all retries
Hint 4▾
get_stats aggregates totals across all jobs: count by status and average duration_ms
Hint 5▾
Progress is a percentage 0-100 representing how far through execution the job is
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
job monitoringstatus trackingalertingmetrics aggregationobservability
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()