Translate schema diff result into ordered execution plan.
Source code in graflo/migrate/planner.py
| class MigrationPlanner:
"""Translate schema diff result into ordered execution plan."""
def __init__(self, allow_high_risk: bool = False):
self.allow_high_risk = allow_high_risk
def build(self, diff_result: SchemaDiffResult) -> MigrationPlan:
"""Build ordered plan with risk gate filtering."""
ordered_ops = sorted(
diff_result.operations,
key=lambda op: (OP_ORDER.get(op.op_type, 9999), op.target),
)
runnable: list[MigrationOperation] = []
blocked: list[MigrationOperation] = []
for op in ordered_ops:
if self.allow_high_risk or is_low_risk(op):
runnable.append(op)
else:
blocked.append(op)
warnings = list(diff_result.warnings)
if blocked and not self.allow_high_risk:
warnings.append(
"High-risk operations are blocked by default. Re-run with explicit allow flag in future guarded workflow."
)
return MigrationPlan(
operations=runnable,
blocked_operations=blocked,
warnings=warnings,
)
|
build(diff_result)
Build ordered plan with risk gate filtering.
Source code in graflo/migrate/planner.py
| def build(self, diff_result: SchemaDiffResult) -> MigrationPlan:
"""Build ordered plan with risk gate filtering."""
ordered_ops = sorted(
diff_result.operations,
key=lambda op: (OP_ORDER.get(op.op_type, 9999), op.target),
)
runnable: list[MigrationOperation] = []
blocked: list[MigrationOperation] = []
for op in ordered_ops:
if self.allow_high_risk or is_low_risk(op):
runnable.append(op)
else:
blocked.append(op)
warnings = list(diff_result.warnings)
if blocked and not self.allow_high_risk:
warnings.append(
"High-risk operations are blocked by default. Re-run with explicit allow flag in future guarded workflow."
)
return MigrationPlan(
operations=runnable,
blocked_operations=blocked,
warnings=warnings,
)
|