Skip to content

graflo.migrate.emitters

Backend emitters for migration execution.

ArangoEmitter

Bases: BaseEmitter

Safe v1 Arango executor: additive operations only.

Source code in graflo/migrate/emitters/arango.py
class ArangoEmitter(BaseEmitter):
    """Safe v1 Arango executor: additive operations only."""

    @property
    def backend_name(self) -> str:
        return "arango"

    def supports(self, operation: MigrationOperation) -> bool:
        return operation.op_type in SUPPORTED_OPS

    def dry_run_message(
        self, operation: MigrationOperation, *, target_schema: Schema
    ) -> str:
        _ = target_schema
        return f"[arango] would apply {operation.op_type} on {operation.target}"

    def execute(
        self,
        conn: Connection,
        operation: MigrationOperation,
        *,
        target_schema: Schema,
    ) -> str:
        if not self.supports(operation):
            raise ValueError(
                f"Operation not supported by arango v1 emitter: {operation.op_type}"
            )
        self._ensure_schema(conn, target_schema)
        return f"[arango] applied {operation.op_type} on {operation.target}"

BaseEmitter

Bases: ABC

Backend adapter contract for migration execution.

Source code in graflo/migrate/emitters/base.py
class BaseEmitter(ABC):
    """Backend adapter contract for migration execution."""

    @property
    @abstractmethod
    def backend_name(self) -> str:
        """Backend identifier."""

    @abstractmethod
    def execute(
        self,
        conn: Connection,
        operation: MigrationOperation,
        *,
        target_schema: Schema,
    ) -> str:
        """Execute operation and return a concise action description."""

    @abstractmethod
    def supports(self, operation: MigrationOperation) -> bool:
        """Return whether operation is supported for this backend in v1."""

    @abstractmethod
    def dry_run_message(
        self, operation: MigrationOperation, *, target_schema: Schema
    ) -> str:
        """Describe what would happen for the operation."""

    def _ensure_schema(self, conn: Connection, schema: Schema) -> None:
        """Ensure target schema artifacts exist (idempotent where supported)."""
        conn.define_schema(schema)
        conn.define_indexes(schema)

    @staticmethod
    def _is_additive_operation(operation: MigrationOperation) -> bool:
        return operation.op_type.startswith("ADD_")

backend_name abstractmethod property

Backend identifier.

dry_run_message(operation, *, target_schema) abstractmethod

Describe what would happen for the operation.

Source code in graflo/migrate/emitters/base.py
@abstractmethod
def dry_run_message(
    self, operation: MigrationOperation, *, target_schema: Schema
) -> str:
    """Describe what would happen for the operation."""

execute(conn, operation, *, target_schema) abstractmethod

Execute operation and return a concise action description.

Source code in graflo/migrate/emitters/base.py
@abstractmethod
def execute(
    self,
    conn: Connection,
    operation: MigrationOperation,
    *,
    target_schema: Schema,
) -> str:
    """Execute operation and return a concise action description."""

supports(operation) abstractmethod

Return whether operation is supported for this backend in v1.

Source code in graflo/migrate/emitters/base.py
@abstractmethod
def supports(self, operation: MigrationOperation) -> bool:
    """Return whether operation is supported for this backend in v1."""

Neo4jEmitter

Bases: BaseEmitter

Safe v1 Neo4j executor: additive operations only.

Source code in graflo/migrate/emitters/neo4j.py
class Neo4jEmitter(BaseEmitter):
    """Safe v1 Neo4j executor: additive operations only."""

    @property
    def backend_name(self) -> str:
        return "neo4j"

    def supports(self, operation: MigrationOperation) -> bool:
        return operation.op_type in SUPPORTED_OPS

    def dry_run_message(
        self, operation: MigrationOperation, *, target_schema: Schema
    ) -> str:
        _ = target_schema
        return f"[neo4j] would apply {operation.op_type} on {operation.target}"

    def execute(
        self,
        conn: Connection,
        operation: MigrationOperation,
        *,
        target_schema: Schema,
    ) -> str:
        if not self.supports(operation):
            raise ValueError(
                f"Operation not supported by neo4j v1 emitter: {operation.op_type}"
            )
        self._ensure_schema(conn, target_schema)
        return f"[neo4j] applied {operation.op_type} on {operation.target}"