Skip to content

graphcast.architecture.resource

Resource management and processing for graph databases.

This module provides the core resource handling functionality for graph databases. It defines how data resources are processed, transformed, and mapped to graph structures through a system of actors and transformations.

Key Components
  • Resource: Main class for resource processing and transformation
  • ActorWrapper: Wrapper for processing actors
  • ActionContext: Context for processing actions
The resource system allows for
  • Data encoding and transformation
  • Vertex and edge creation
  • Weight management
  • Collection merging
  • Type casting and validation
Example

resource = Resource( ... resource_name="users", ... apply=[VertexActor("user"), EdgeActor("follows")], ... encoding=EncodingType.UTF_8 ... ) result = resource(doc)

Resource dataclass

Bases: BaseDataclass, JSONWizard

Resource configuration and processing.

This class represents a data resource that can be processed and transformed into graph structures. It manages the processing pipeline through actors and handles data encoding, transformation, and mapping.

Attributes:

Name Type Description
resource_name str

Name of the resource

apply list

List of actors to apply in sequence

encoding EncodingType

Data encoding type (default: UTF_8)

merge_collections list[str]

List of collections to merge

extra_weights list[Edge]

List of additional edge weights

types dict[str, str]

Dictionary of field type mappings

root dict[str, str]

Root actor wrapper for processing

vertex_config dict[str, str]

Configuration for vertices

edge_config dict[str, str]

Configuration for edges

Source code in graphcast/architecture/resource.py
@dataclasses.dataclass(kw_only=True)
class Resource(BaseDataclass, JSONWizard):
    """Resource configuration and processing.

    This class represents a data resource that can be processed and transformed
    into graph structures. It manages the processing pipeline through actors
    and handles data encoding, transformation, and mapping.

    Attributes:
        resource_name: Name of the resource
        apply: List of actors to apply in sequence
        encoding: Data encoding type (default: UTF_8)
        merge_collections: List of collections to merge
        extra_weights: List of additional edge weights
        types: Dictionary of field type mappings
        root: Root actor wrapper for processing
        vertex_config: Configuration for vertices
        edge_config: Configuration for edges
    """

    resource_name: str
    apply: list
    encoding: EncodingType = EncodingType.UTF_8
    merge_collections: list[str] = dataclasses.field(default_factory=list)
    extra_weights: list[Edge] = dataclasses.field(default_factory=list)
    types: dict[str, str] = dataclasses.field(default_factory=dict)

    def __post_init__(self):
        """Initialize the resource after dataclass initialization.

        Sets up the actor wrapper and type mappings. Evaluates type expressions
        for field type casting.

        Raises:
            Exception: If type evaluation fails for any field
        """
        self.root = ActorWrapper(*self.apply)
        self._types: dict[str, Callable] = dict()
        self.vertex_config: VertexConfig
        self.edge_config: EdgeConfig
        for k, v in self.types.items():
            try:
                self._types[k] = eval(v)
            except Exception as ex:
                logger.error(
                    f"For resource {self.name} for field {k} failed to cast type {v} : {ex}"
                )

    @property
    def name(self):
        """Get the resource name.

        Returns:
            str: Name of the resource
        """
        return self.resource_name

    def finish_init(
        self,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
        transforms: dict[str, ProtoTransform],
    ):
        """Complete resource initialization.

        Initializes the resource with vertex and edge configurations,
        and sets up the processing pipeline.

        Args:
            vertex_config: Configuration for vertices
            edge_config: Configuration for edges
            transforms: Dictionary of available transforms
        """
        self.vertex_config = vertex_config
        self.edge_config = edge_config

        logger.debug(f"total resource actor count : {self.root.count()}")
        self.root.finish_init(
            vertex_config=vertex_config,
            transforms=transforms,
            edge_config=edge_config,
        )

        logger.debug(f"total resource actor count (after 2 finit): {self.root.count()}")

        for e in self.extra_weights:
            e.finish_init(vertex_config)

    def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
        """Process a document through the resource pipeline.

        Args:
            doc: Document to process

        Returns:
            defaultdict[GraphEntity, list]: Processed graph entities
        """
        ctx = ActionContext()
        ctx = self.root(ctx, doc=doc)
        acc = self.root.normalize_ctx(ctx)
        return acc

    def count(self):
        """Get the total number of actors in the resource.

        Returns:
            int: Number of actors
        """
        return self.root.count()

name property

Get the resource name.

Returns:

Name Type Description
str

Name of the resource

__call__(doc)

Process a document through the resource pipeline.

Parameters:

Name Type Description Default
doc dict

Document to process

required

Returns:

Type Description
defaultdict[GraphEntity, list]

defaultdict[GraphEntity, list]: Processed graph entities

Source code in graphcast/architecture/resource.py
def __call__(self, doc: dict) -> defaultdict[GraphEntity, list]:
    """Process a document through the resource pipeline.

    Args:
        doc: Document to process

    Returns:
        defaultdict[GraphEntity, list]: Processed graph entities
    """
    ctx = ActionContext()
    ctx = self.root(ctx, doc=doc)
    acc = self.root.normalize_ctx(ctx)
    return acc

__post_init__()

Initialize the resource after dataclass initialization.

Sets up the actor wrapper and type mappings. Evaluates type expressions for field type casting.

Raises:

Type Description
Exception

If type evaluation fails for any field

Source code in graphcast/architecture/resource.py
def __post_init__(self):
    """Initialize the resource after dataclass initialization.

    Sets up the actor wrapper and type mappings. Evaluates type expressions
    for field type casting.

    Raises:
        Exception: If type evaluation fails for any field
    """
    self.root = ActorWrapper(*self.apply)
    self._types: dict[str, Callable] = dict()
    self.vertex_config: VertexConfig
    self.edge_config: EdgeConfig
    for k, v in self.types.items():
        try:
            self._types[k] = eval(v)
        except Exception as ex:
            logger.error(
                f"For resource {self.name} for field {k} failed to cast type {v} : {ex}"
            )

count()

Get the total number of actors in the resource.

Returns:

Name Type Description
int

Number of actors

Source code in graphcast/architecture/resource.py
def count(self):
    """Get the total number of actors in the resource.

    Returns:
        int: Number of actors
    """
    return self.root.count()

finish_init(vertex_config, edge_config, transforms)

Complete resource initialization.

Initializes the resource with vertex and edge configurations, and sets up the processing pipeline.

Parameters:

Name Type Description Default
vertex_config VertexConfig

Configuration for vertices

required
edge_config EdgeConfig

Configuration for edges

required
transforms dict[str, ProtoTransform]

Dictionary of available transforms

required
Source code in graphcast/architecture/resource.py
def finish_init(
    self,
    vertex_config: VertexConfig,
    edge_config: EdgeConfig,
    transforms: dict[str, ProtoTransform],
):
    """Complete resource initialization.

    Initializes the resource with vertex and edge configurations,
    and sets up the processing pipeline.

    Args:
        vertex_config: Configuration for vertices
        edge_config: Configuration for edges
        transforms: Dictionary of available transforms
    """
    self.vertex_config = vertex_config
    self.edge_config = edge_config

    logger.debug(f"total resource actor count : {self.root.count()}")
    self.root.finish_init(
        vertex_config=vertex_config,
        transforms=transforms,
        edge_config=edge_config,
    )

    logger.debug(f"total resource actor count (after 2 finit): {self.root.count()}")

    for e in self.extra_weights:
        e.finish_init(vertex_config)