Skip to content

graflo.architecture.graph_types.context

Extraction and assembly runtime contexts.

ActionContext

Bases: ExtractionContext

Backward-compatible extraction+assembly context.

Kept for existing callers/tests while Wave 5 migrates call surfaces.

Source code in graflo/architecture/graph_types/context.py
class ActionContext(ExtractionContext):
    """Backward-compatible extraction+assembly context.

    Kept for existing callers/tests while Wave 5 migrates call surfaces.
    """

    acc_global: Any = Field(default_factory=dd_factory)

AssemblyContext

Bases: ConfigBaseModel

Assembly-phase context built from extraction outputs.

Source code in graflo/architecture/graph_types/context.py
class AssemblyContext(ConfigBaseModel):
    """Assembly-phase context built from extraction outputs."""

    model_config = ConfigDict(kw_only=True)  # ty: ignore[invalid-key]

    extraction: ExtractionContext
    acc_global: Any = Field(default_factory=dd_factory)

    @property
    def acc_vertex(self) -> Any:
        return self.extraction.acc_vertex

    @property
    def transform_buffer(self) -> Any:
        return self.extraction.transform_buffer

    @property
    def obs_buffer(self) -> Any:
        return self.extraction.obs_buffer

    @property
    def edge_intents(self) -> list[EdgeIntent]:
        return self.extraction.edge_intents

    @classmethod
    def from_extraction(cls, extraction: ExtractionContext) -> AssemblyContext:
        return cls(extraction=extraction)

EdgeIntent

Bases: ConfigBaseModel

Typed edge assembly request emitted during extraction.

Source code in graflo/architecture/graph_types/context.py
class EdgeIntent(ConfigBaseModel):
    """Typed edge assembly request emitted during extraction."""

    edge: Any
    location: LocationIndex | None = None
    provenance: ProvenancePath | None = None
    derivation: EdgeDerivation | None = None

ExtractionContext

Bases: ConfigBaseModel

Extraction-phase context.

Attributes:

Name Type Description
acc_vertex Any

Local accumulation of extracted vertices

transform_buffer Any

Buffer for transform payloads (defaultdict[LocationIndex, list])

obs_buffer Any

Merged observation context per location (dict[LocationIndex, dict])

vertex_observations list[VertexObservation]

Explicit extracted vertex observations

transform_observations list[TransformObservation]

Explicit extracted transform observations

edge_intents list[EdgeIntent]

Explicit edge intents for assembly phase

Source code in graflo/architecture/graph_types/context.py
class ExtractionContext(ConfigBaseModel):
    """Extraction-phase context.

    Attributes:
        acc_vertex: Local accumulation of extracted vertices
        transform_buffer: Buffer for transform payloads (defaultdict[LocationIndex, list])
        obs_buffer: Merged observation context per location (dict[LocationIndex, dict])
        vertex_observations: Explicit extracted vertex observations
        transform_observations: Explicit extracted transform observations
        edge_intents: Explicit edge intents for assembly phase
    """

    model_config = ConfigDict(kw_only=True)  # ty: ignore[invalid-key]

    # Pydantic cannot schema nested defaultdict with custom key types (e.g. LocationIndex),
    # so we use Any; runtime type is as documented in Attributes
    acc_vertex: Any = Field(default_factory=outer_factory)
    transform_buffer: Any = Field(default_factory=_default_dict_transforms)
    obs_buffer: Any = Field(default_factory=_default_dict_observations)
    vertex_observations: list[VertexObservation] = Field(
        default_factory=_default_vertex_observations
    )
    transform_observations: list[TransformObservation] = Field(
        default_factory=_default_transform_observations
    )
    edge_intents: list[EdgeIntent] = Field(default_factory=_default_edge_intents)
    transform_failures: list[TransformCastFailure] = Field(
        default_factory=_default_transform_failures
    )

    def record_vertex_observation(
        self, *, vertex_name: str, location: LocationIndex, vertex: dict, ctx: dict
    ) -> None:
        self.vertex_observations.append(
            VertexObservation(
                vertex_name=vertex_name,
                location=location,
                vertex=vertex,
                ctx=ctx,
                provenance=ProvenancePath.from_lindex(location),
            )
        )

    def record_transform_observation(
        self, *, location: LocationIndex, payload: TransformPayload
    ) -> None:
        self.transform_observations.append(
            TransformObservation(
                location=location,
                payload=payload,
                provenance=ProvenancePath.from_lindex(location),
            )
        )

    def record_edge_intent(
        self,
        *,
        edge: Any,
        location: LocationIndex,
        derivation: EdgeDerivation | None = None,
    ) -> None:
        self.edge_intents.append(
            EdgeIntent(
                edge=edge,
                location=location,
                provenance=ProvenancePath.from_lindex(location),
                derivation=derivation,
            )
        )

    def record_transform_failure(
        self,
        *,
        location: LocationIndex,
        transform_label: str,
        exc: BaseException,
        traceback_text: str,
        nulled_fields: tuple[str, ...],
    ) -> None:
        self.transform_failures.append(
            TransformCastFailure(
                location=location,
                transform_label=transform_label,
                exception_type=type(exc).__name__,
                message=str(exc),
                traceback=traceback_text,
                nulled_fields=nulled_fields,
            )
        )

GraphAssemblyResult

Bases: ConfigBaseModel

Result of graph assembly phase.

Source code in graflo/architecture/graph_types/context.py
class GraphAssemblyResult(ConfigBaseModel):
    """Result of graph assembly phase."""

    entities: Any = Field(default_factory=dd_factory)

ResourceCastResult

Bases: ConfigBaseModel

Outcome of casting one document through a resource pipeline.

Source code in graflo/architecture/graph_types/context.py
class ResourceCastResult(ConfigBaseModel):
    """Outcome of casting one document through a resource pipeline."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    entities: Any
    transform_failures: list[TransformCastFailure] = Field(default_factory=list)

TransformCastFailure

Bases: ConfigBaseModel

One transform step that failed during extraction (tolerance mode).

Source code in graflo/architecture/graph_types/context.py
class TransformCastFailure(ConfigBaseModel):
    """One transform step that failed during extraction (tolerance mode)."""

    location: LocationIndex
    transform_label: str
    exception_type: str
    message: str
    traceback: str = ""
    nulled_fields: tuple[str, ...] = Field(default_factory=tuple)

TransformObservation

Bases: ConfigBaseModel

Typed transform observation emitted during extraction.

Source code in graflo/architecture/graph_types/context.py
class TransformObservation(ConfigBaseModel):
    """Typed transform observation emitted during extraction."""

    location: LocationIndex
    payload: TransformPayload
    provenance: ProvenancePath

VertexObservation

Bases: ConfigBaseModel

Typed vertex observation emitted during extraction.

Source code in graflo/architecture/graph_types/context.py
class VertexObservation(ConfigBaseModel):
    """Typed vertex observation emitted during extraction."""

    vertex_name: str
    location: LocationIndex
    vertex: dict[str, Any]
    ctx: dict[str, Any]
    provenance: ProvenancePath

dd_factory()

Create a default dictionary for graph entity data.

Source code in graflo/architecture/graph_types/context.py
def dd_factory() -> defaultdict[GraphEntity, list]:
    """Create a default dictionary for graph entity data."""
    return defaultdict(list)

inner_factory_vertex()

Create a default dictionary for vertex data.

Source code in graflo/architecture/graph_types/context.py
def inner_factory_vertex() -> defaultdict[LocationIndex, list]:
    """Create a default dictionary for vertex data."""
    return defaultdict(list)

outer_factory()

Create a nested default dictionary for vertex data.

Source code in graflo/architecture/graph_types/context.py
def outer_factory() -> defaultdict[str, defaultdict[LocationIndex, list]]:
    """Create a nested default dictionary for vertex data."""
    return defaultdict(inner_factory_vertex)