TASK
Implementation
Data migrations transform existing data to match a new schema or business rule. Unlike schema migrations, they touch every row. Running them during peak traffic causes lock contention — so they must run in small batches, be idempotent (safe to re-run), and be validated before the old schema is removed.
Implement a node that manages data migrations:
// Backfill full_name column for 10,000 existing users in batches
{ "type": "backfill", "msg_id": 1,
"table": "users", "column": "full_name",
"batch_size": 1000, "total_rows": 10000 }
-> { "type": "backfill_complete", "in_reply_to": 1,
"total_processed": 10000, "total_updated": 9500,
"duration_seconds": 60 }
// Validate the migrated data meets constraints
{ "type": "validate", "msg_id": 2,
"table": "users",
"validations": ["no_nulls", "email_format"] }
-> { "type": "validation_results", "in_reply_to": 2,
"results": [
{"name": "no_nulls", "passed": true, "failed_rows": 0},
{"name": "email_format", "passed": true, "failed_rows": 0}
]}
// Running migration 3 times produces the same final state
{ "type": "migrate", "msg_id": 3,
"idempotent": true, "table": "users", "runs": 3 }
-> { "type": "migration_complete", "in_reply_to": 3,
"rows_updated": 100, "final_state": "unchanged" }Sample Test Cases
Backfill data in batchesTimeout: 10000ms
Input
{
"src": "admin",
"dest": "migrations",
"body": {
"type": "backfill",
"msg_id": 1,
"table": "users",
"column": "full_name",
"batch_size": 1000,
"total_rows": 10000
}
}Expected Output
{"type": "backfill_complete", "in_reply_to": 1, "total_processed": 10000, "total_updated": 9500, "duration_seconds": 60}Validate migrated dataTimeout: 5000ms
Input
{
"src": "admin",
"dest": "migrations",
"body": {
"type": "validate",
"msg_id": 1,
"table": "users",
"validations": [
"no_nulls",
"email_format"
]
}
}Expected Output
{"type": "validation_results", "in_reply_to": 1, "results": [{"name": "no_nulls", "passed": true, "failed_rows": 0}, {"name": "email_format", "passed": true, "failed_rows": 0}]}Hints
Hint 1▾
Backfill in batches: process rows WHERE id > last_processed_id LIMIT batch_size
Hint 2▾
Track total_processed and total_updated separately (some rows may already be correct)
Hint 3▾
Idempotent: running the migration twice should produce the same result, not double-update
Hint 4▾
Validate after migration: check constraints like no_nulls and format rules
Hint 5▾
Rollback on validation failure: restore from backup if post-migration checks fail
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
data backfillbatch processingidempotent migrationdata validationrollback on failure
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()