TASK
Implementation
Cross-shard JOINs are expensive because they may require moving data across the network. The optimal strategy depends on how tables are partitioned.
Co-located joins (best case):
- Both tables are hash-partitioned by the join key
- Example:
usersandordersboth partitioned byuser_id - Each shard can perform the join locally on its partition
- Coordinator merges partial results
- Zero network overhead after the initial scatter
Example co-located join:
Request: {"type": "join_query", "msg_id": 1, "left": "users", "right": "orders", "on": "user_id"}
Response: {"type": "join_query_ok", "in_reply_to": 1, "results": [...], "strategy": "co-located", "shuffle_bytes": 0}Shuffle joins (worst case):
- Tables are partitioned by different keys
- Example:
usersbyuser_id,ordersbyorder_id - Must repartition both tables by the join key
- Or broadcast the smaller table to all shards
- High network overhead
Example shuffle join:
Request: {"type": "join_query", "msg_id": 2, "left": "users", "right": "reviews", "on": "user_id"}
Response: {"type": "join_query_ok", "in_reply_to": 2, "results": [...], "strategy": "hash-shuffle", "shuffle_bytes": 5242880}Implementation strategies:
- Check partitioning metadata for both tables
- If co-located, execute local joins on each shard
- If not co-located, choose the cheapest strategy:
- Broadcast join if one table is small (< 1000 rows)
- Hash shuffle if both tables are large
- Return the strategy used and shuffle_bytes for visibility
Sample Test Cases
Co-located join (same partition key)Timeout: 5000ms
Input
{"src":"c0","dest":"coord","body":{"type":"init","msg_id":1,"shards":["s1","s2","s3"]}}
{"src":"c1","dest":"coord","body":{"type":"join_query","msg_id":2,"left":"users","right":"orders","on":"user_id"}}
Expected Output
{"src": "coord", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
Shuffle join (different partition keys)Timeout: 5000ms
Input
{"src":"c0","dest":"coord","body":{"type":"init","msg_id":1,"shards":["s1","s2"]}}
{"src":"c1","dest":"coord","body":{"type":"join_query","msg_id":2,"left":"users","right":"reviews","on":"user_id"}}
Expected Output
{"src": "coord", "dest": "c0", "body": {"type": "init_ok", "in_reply_to": 1, "msg_id": 0}}
Hints
Hint 1▾
If both tables are partitioned by the same key (co-located), join locally on each shard
Hint 2▾
If tables are partitioned differently, you need to shuffle data across shards
Hint 3▾
Broadcast join: send the smaller table to all shards, join locally
Hint 4▾
Hash shuffle: repartition both tables by the join key, then join locally
Hint 5▾
Track the "shuffle_bytes" metric to measure network overhead
OVERVIEW
Theoretical Hub
Concept overview coming soon
Key Concepts
distributed joinshash partitioningco-located joinsshuffle joinsjoin reorderingnetwork overhead
main.py
python
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
import sys
import json
def main():
# Your implementation here
for line in sys.stdin:
msg = json.loads(line)
print(json.dumps(msg), flush=True)
if __name__ == "__main__":
main()