TASK
Implementation
A DAG (Directed Acyclic Graph) scheduler orchestrates workflows where tasks have dependencies. Task B cannot start until Task A finishes, but independent tasks can run in parallel, reducing total execution time.
Implement a node that creates, validates, and executes DAG workflows:
// Create a DAG: B and C both depend on A
{ "type": "create_dag", "msg_id": 1,
"tasks": [{"id":"A","command":"run A"},
{"id":"B","command":"run B"},
{"id":"C","command":"run C"}],
"dependencies": [{"task":"B","depends_on":"A"},
{"task":"C","depends_on":"A"}] }
-> { "type": "dag_created", "in_reply_to": 1,
"dag_id": "<uuid>", "valid": true, "tasks": 3 }
// A cycle A->B->C->A is rejected
-> { "type": "dag_invalid", "in_reply_to": 1,
"error": "Cycle detected in DAG" }
// Execute: run A first, then B and C in parallel
{ "type": "execute_dag", "msg_id": 2, "dag_id": "dag-123" }
-> { "type": "dag_executed", "in_reply_to": 2,
"execution_id": "<uuid>", "status": "completed",
"tasks_completed": 3, "duration_seconds": 10 }When a task fails, all tasks that (directly or transitively) depend on it are also marked failed and listed in failed_tasks.
Sample Test Cases
Create and validate DAGTimeout: 5000ms
Input
{
"src": "client",
"dest": "dag",
"body": {
"type": "create_dag",
"msg_id": 1,
"tasks": [
{
"id": "A",
"command": "run A"
},
{
"id": "B",
"command": "run B"
},
{
"id": "C",
"command": "run C"
}
],
"dependencies": [
{
"task": "B",
"depends_on": "A"
},
{
"task": "C",
"depends_on": "A"
}
]
}
}Expected Output
{"type": "dag_created", "in_reply_to": 1, "dag_id": ".*", "valid": true, "tasks": 3}Execute DAG with parallel tasksTimeout: 15000ms
Input
{
"src": "executor",
"dest": "dag",
"body": {
"type": "execute_dag",
"msg_id": 1,
"dag_id": "dag-123"
}
}Expected Output
{"type": "dag_executed", "in_reply_to": 1, "execution_id": ".*", "status": "completed", "tasks_completed": 3, "duration_seconds": 10}Hints
Hint 1▾
A DAG is valid if it contains no cycles — use DFS with a visiting set to detect back-edges
Hint 2▾
Topological sort gives safe execution order: a task runs only after all its dependencies finish
Hint 3▾
Tasks with no unmet dependencies can all be launched in parallel
Hint 4▾
If task A fails, mark every task that transitively depends on A as failed
Hint 5▾
Use Kahn's algorithm or recursive DFS for topological ordering
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
DAGtopological sorttask dependenciescycle detectionparallel execution
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()