Skip to content

graflo.architecture.contract.ingestion.model

Ingestion model definitions and runtime preparation helpers.

IngestionModel

Bases: ConfigBaseModel

Ingestion model (C): resources and transform registry.

Source code in graflo/architecture/contract/ingestion/model.py
class IngestionModel(ConfigBaseModel):
    """Ingestion model (C): resources and transform registry."""

    edges_on_duplicate: Literal["ignore", "upsert"] = PydanticField(
        default="ignore",
        description=(
            "How batch edge writes tolerate an already-matching edge. Passed through to "
            ":meth:`~graflo.db.conn.Connection.insert_edges_batch` where the target backend "
            "supports it."
        ),
    )
    resources: list[ResourceConfig] = PydanticField(
        default_factory=list,
        description="List of resource definitions (data pipelines mapping to vertices/edges).",
    )
    transforms: list[ProtoTransform] = PydanticField(
        default_factory=list,
        description="List of named transforms available to resources.",
    )

    _resources: dict[str, ResourceConfig] = PrivateAttr()
    _runtimes: dict[str, ResourceRuntime] = PrivateAttr(default_factory=dict)
    _transforms: dict[str, ProtoTransform] = PrivateAttr(default_factory=dict)
    _combined_edge_derivation: EdgeDerivationRegistry = PrivateAttr(
        default_factory=EdgeDerivationRegistry
    )

    @model_validator(mode="after")
    def _init_model(self) -> IngestionModel:
        """Build transform and resource lookup maps."""
        self._rebuild_config_state()
        return self

    def _rebuild_resource_map(self) -> None:
        """Validate resource name uniqueness and refresh lookup map."""
        names = [r.name for r in self.resources]
        c = Counter(names)
        for k, v in c.items():
            if v > 1:
                raise ValueError(f"resource name {k} used {v} times")
        object.__setattr__(self, "_resources", {r.name: r for r in self.resources})

    def _rebuild_transform_map(self) -> None:
        """Validate transform names and refresh name lookup map."""
        missing_names = [idx for idx, t in enumerate(self.transforms) if not t.name]
        if missing_names:
            raise ValueError(
                "All ingestion transforms must define a non-empty name. "
                f"Missing at indexes: {missing_names}"
            )

        transform_names = [t.name for t in self.transforms if t.name is not None]
        name_counts = Counter(transform_names)
        duplicates = sorted([name for name, count in name_counts.items() if count > 1])
        if duplicates:
            raise ValueError(f"Duplicate ingestion transform names found: {duplicates}")

        object.__setattr__(
            self,
            "_transforms",
            {t.name: t for t in self.transforms if t.name is not None},
        )

    def finish_init(
        self,
        core_schema: CoreSchema,
        *,
        strict_references: bool = False,
        dynamic_edge_feedback: bool = False,
        allowed_vertex_names: set[str] | None = None,
        target_db_flavor: DBType | None = None,
    ) -> None:
        """Build per-resource runtimes against graph model and transform library."""
        self._rebuild_config_state()
        runtimes: dict[str, ResourceRuntime] = {}
        for config in self.resources:
            runtimes[config.name] = ResourceRuntime(
                config,
                vertex_config=core_schema.vertex_config,
                edge_config=core_schema.edge_config,
                transforms=self._transforms,
                strict_references=strict_references,
                dynamic_edge_feedback=dynamic_edge_feedback,
                allowed_vertex_names=allowed_vertex_names,
                target_db_flavor=target_db_flavor,
            )
        object.__setattr__(self, "_runtimes", runtimes)

    def _rebuild_config_state(self) -> None:
        """Rebuild transform and resource lookup maps."""
        self._rebuild_transform_map()
        self._rebuild_resource_map()

    def fetch_resource(self, name: str | None = None) -> ResourceRuntime:
        """Fetch an initialized runtime resource by name."""
        if name is not None:
            runtime = self._runtimes.get(name)
            if runtime is None:
                raise ValueError(f"Resource {name} not found")
            return runtime
        if self._runtimes:
            return next(iter(self._runtimes.values()))
        if self.resources:
            raise RuntimeError(
                "IngestionModel resources exist but runtimes were not built; "
                "call finish_init() first."
            )
        raise ValueError("Empty resource container :(")

    def fetch_resource_config(self, name: str) -> ResourceConfig:
        """Fetch declarative resource config by name."""
        config = self._resources.get(name)
        if config is None:
            raise ValueError(f"Resource {name} not found")
        return config

    def prune_to_graph(
        self, core_schema: CoreSchema, disconnected: set[str] | None = None
    ) -> None:
        """Drop resource actors that reference disconnected vertices."""
        if disconnected is None:
            disconnected = (
                core_schema.vertex_config.vertex_set - core_schema.edge_config.vertices
            )
        if not disconnected:
            return

        def _mentions_disconnected(wrapper: ActorWrapper) -> bool:
            return bool(wrapper.actor.references_vertices() & disconnected)

        to_drop: list[ResourceConfig] = []
        for resource_config in self.resources:
            root = ActorWrapper(*resource_config.pipeline)
            if _mentions_disconnected(root):
                to_drop.append(resource_config)
                continue
            root.remove_descendants_if(_mentions_disconnected)
            if not any(a.references_vertices() for a in root.collect_actors()):
                to_drop.append(resource_config)

        for dropped in to_drop:
            self.resources.remove(dropped)
            self._resources.pop(dropped.name, None)
            self._runtimes.pop(dropped.name, None)
        if to_drop:
            self._rebuild_config_state()

fetch_resource(name=None)

Fetch an initialized runtime resource by name.

Source code in graflo/architecture/contract/ingestion/model.py
def fetch_resource(self, name: str | None = None) -> ResourceRuntime:
    """Fetch an initialized runtime resource by name."""
    if name is not None:
        runtime = self._runtimes.get(name)
        if runtime is None:
            raise ValueError(f"Resource {name} not found")
        return runtime
    if self._runtimes:
        return next(iter(self._runtimes.values()))
    if self.resources:
        raise RuntimeError(
            "IngestionModel resources exist but runtimes were not built; "
            "call finish_init() first."
        )
    raise ValueError("Empty resource container :(")

fetch_resource_config(name)

Fetch declarative resource config by name.

Source code in graflo/architecture/contract/ingestion/model.py
def fetch_resource_config(self, name: str) -> ResourceConfig:
    """Fetch declarative resource config by name."""
    config = self._resources.get(name)
    if config is None:
        raise ValueError(f"Resource {name} not found")
    return config

finish_init(core_schema, *, strict_references=False, dynamic_edge_feedback=False, allowed_vertex_names=None, target_db_flavor=None)

Build per-resource runtimes against graph model and transform library.

Source code in graflo/architecture/contract/ingestion/model.py
def finish_init(
    self,
    core_schema: CoreSchema,
    *,
    strict_references: bool = False,
    dynamic_edge_feedback: bool = False,
    allowed_vertex_names: set[str] | None = None,
    target_db_flavor: DBType | None = None,
) -> None:
    """Build per-resource runtimes against graph model and transform library."""
    self._rebuild_config_state()
    runtimes: dict[str, ResourceRuntime] = {}
    for config in self.resources:
        runtimes[config.name] = ResourceRuntime(
            config,
            vertex_config=core_schema.vertex_config,
            edge_config=core_schema.edge_config,
            transforms=self._transforms,
            strict_references=strict_references,
            dynamic_edge_feedback=dynamic_edge_feedback,
            allowed_vertex_names=allowed_vertex_names,
            target_db_flavor=target_db_flavor,
        )
    object.__setattr__(self, "_runtimes", runtimes)

prune_to_graph(core_schema, disconnected=None)

Drop resource actors that reference disconnected vertices.

Source code in graflo/architecture/contract/ingestion/model.py
def prune_to_graph(
    self, core_schema: CoreSchema, disconnected: set[str] | None = None
) -> None:
    """Drop resource actors that reference disconnected vertices."""
    if disconnected is None:
        disconnected = (
            core_schema.vertex_config.vertex_set - core_schema.edge_config.vertices
        )
    if not disconnected:
        return

    def _mentions_disconnected(wrapper: ActorWrapper) -> bool:
        return bool(wrapper.actor.references_vertices() & disconnected)

    to_drop: list[ResourceConfig] = []
    for resource_config in self.resources:
        root = ActorWrapper(*resource_config.pipeline)
        if _mentions_disconnected(root):
            to_drop.append(resource_config)
            continue
        root.remove_descendants_if(_mentions_disconnected)
        if not any(a.references_vertices() for a in root.collect_actors()):
            to_drop.append(resource_config)

    for dropped in to_drop:
        self.resources.remove(dropped)
        self._resources.pop(dropped.name, None)
        self._runtimes.pop(dropped.name, None)
    if to_drop:
        self._rebuild_config_state()