Skip to content

graflo.hq.inferencer

InferenceManager

Inference manager for PostgreSQL sources.

Source code in graflo/hq/inferencer.py
class InferenceManager:
    """Inference manager for PostgreSQL sources."""

    def __init__(
        self,
        conn: PostgresConnection,
        target_db_flavor: DBType = DBType.ARANGO,
        fuzzy_threshold: float = 0.8,
    ):
        """Initialize the PostgreSQL inference manager.

        Args:
            conn: PostgresConnection instance
            target_db_flavor: Target database flavor for schema sanitization
            fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8)
        """
        self.target_db_flavor = target_db_flavor
        self.sanitizer = SchemaSanitizer(target_db_flavor)
        self.conn = conn
        self.inferencer = PostgresSchemaInferencer(
            db_flavor=target_db_flavor, conn=conn
        )
        self.mapper = PostgresResourceMapper(fuzzy_threshold=fuzzy_threshold)

    def introspect(self, schema_name: str | None = None) -> SchemaIntrospectionResult:
        """Introspect PostgreSQL schema.

        Args:
            schema_name: Schema name to introspect

        Returns:
            SchemaIntrospectionResult: PostgreSQL schema introspection result
        """
        return self.conn.introspect_schema(schema_name=schema_name)

    def infer_schema(
        self, introspection_result, schema_name: str | None = None
    ) -> Schema:
        """Infer graflo Schema from PostgreSQL introspection result.

        Args:
            introspection_result: SchemaIntrospectionResult from PostgreSQL
            schema_name: Schema name (optional, may be inferred from result)

        Returns:
            Schema: Inferred schema with vertices and edges
        """
        return self.inferencer.infer_schema(
            introspection_result, schema_name=schema_name
        )

    def create_resources(
        self, introspection_result, schema: Schema
    ) -> list["Resource"]:
        """Create Resources from PostgreSQL introspection result.

        Args:
            introspection_result: SchemaIntrospectionResult from PostgreSQL
            schema: Existing Schema object

        Returns:
            list[Resource]: List of Resources for PostgreSQL tables
        """
        return self.mapper.create_resources_from_tables(
            introspection_result,
            schema.vertex_config,
            schema.edge_config,
            vertex_attribute_mappings=self.sanitizer.vertex_attribute_mappings,
            fuzzy_threshold=self.mapper.fuzzy_threshold,
        )

    def infer_complete_schema(self, schema_name: str | None = None) -> Schema:
        """Infer a complete Schema from source and sanitize for target.

        This is a convenience method that:
        1. Introspects the source schema
        2. Infers the graflo Schema
        3. Sanitizes for the target database flavor
        4. Creates and adds resources
        5. Re-initializes the schema

        Args:
            schema_name: Schema name to introspect (source-specific)

        Returns:
            Schema: Complete inferred schema with vertices, edges, and resources
        """
        # Introspect the schema
        introspection_result = self.introspect(schema_name=schema_name)

        # Infer schema
        schema = self.infer_schema(introspection_result, schema_name=schema_name)

        # Sanitize for target database flavor
        schema = self.sanitizer.sanitize(schema)

        # Create and add resources
        resources = self.create_resources(introspection_result, schema)
        schema.resources = resources

        # Re-initialize to set up resource mappings
        schema.finish_init()

        return schema

    def create_resources_for_schema(
        self, schema: Schema, schema_name: str | None = None
    ) -> list["Resource"]:
        """Create Resources from source for an existing schema.

        Args:
            schema: Existing Schema object
            schema_name: Schema name to introspect (source-specific)

        Returns:
            list[Resource]: List of Resources for the source
        """
        # Introspect the schema
        introspection_result = self.introspect(schema_name=schema_name)

        # Create resources
        return self.create_resources(introspection_result, schema)

__init__(conn, target_db_flavor=DBType.ARANGO, fuzzy_threshold=0.8)

Initialize the PostgreSQL inference manager.

Parameters:

Name Type Description Default
conn PostgresConnection

PostgresConnection instance

required
target_db_flavor DBType

Target database flavor for schema sanitization

ARANGO
fuzzy_threshold float

Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8)

0.8
Source code in graflo/hq/inferencer.py
def __init__(
    self,
    conn: PostgresConnection,
    target_db_flavor: DBType = DBType.ARANGO,
    fuzzy_threshold: float = 0.8,
):
    """Initialize the PostgreSQL inference manager.

    Args:
        conn: PostgresConnection instance
        target_db_flavor: Target database flavor for schema sanitization
        fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8)
    """
    self.target_db_flavor = target_db_flavor
    self.sanitizer = SchemaSanitizer(target_db_flavor)
    self.conn = conn
    self.inferencer = PostgresSchemaInferencer(
        db_flavor=target_db_flavor, conn=conn
    )
    self.mapper = PostgresResourceMapper(fuzzy_threshold=fuzzy_threshold)

create_resources(introspection_result, schema)

Create Resources from PostgreSQL introspection result.

Parameters:

Name Type Description Default
introspection_result

SchemaIntrospectionResult from PostgreSQL

required
schema Schema

Existing Schema object

required

Returns:

Type Description
list[Resource]

list[Resource]: List of Resources for PostgreSQL tables

Source code in graflo/hq/inferencer.py
def create_resources(
    self, introspection_result, schema: Schema
) -> list["Resource"]:
    """Create Resources from PostgreSQL introspection result.

    Args:
        introspection_result: SchemaIntrospectionResult from PostgreSQL
        schema: Existing Schema object

    Returns:
        list[Resource]: List of Resources for PostgreSQL tables
    """
    return self.mapper.create_resources_from_tables(
        introspection_result,
        schema.vertex_config,
        schema.edge_config,
        vertex_attribute_mappings=self.sanitizer.vertex_attribute_mappings,
        fuzzy_threshold=self.mapper.fuzzy_threshold,
    )

create_resources_for_schema(schema, schema_name=None)

Create Resources from source for an existing schema.

Parameters:

Name Type Description Default
schema Schema

Existing Schema object

required
schema_name str | None

Schema name to introspect (source-specific)

None

Returns:

Type Description
list[Resource]

list[Resource]: List of Resources for the source

Source code in graflo/hq/inferencer.py
def create_resources_for_schema(
    self, schema: Schema, schema_name: str | None = None
) -> list["Resource"]:
    """Create Resources from source for an existing schema.

    Args:
        schema: Existing Schema object
        schema_name: Schema name to introspect (source-specific)

    Returns:
        list[Resource]: List of Resources for the source
    """
    # Introspect the schema
    introspection_result = self.introspect(schema_name=schema_name)

    # Create resources
    return self.create_resources(introspection_result, schema)

infer_complete_schema(schema_name=None)

Infer a complete Schema from source and sanitize for target.

This is a convenience method that: 1. Introspects the source schema 2. Infers the graflo Schema 3. Sanitizes for the target database flavor 4. Creates and adds resources 5. Re-initializes the schema

Parameters:

Name Type Description Default
schema_name str | None

Schema name to introspect (source-specific)

None

Returns:

Name Type Description
Schema Schema

Complete inferred schema with vertices, edges, and resources

Source code in graflo/hq/inferencer.py
def infer_complete_schema(self, schema_name: str | None = None) -> Schema:
    """Infer a complete Schema from source and sanitize for target.

    This is a convenience method that:
    1. Introspects the source schema
    2. Infers the graflo Schema
    3. Sanitizes for the target database flavor
    4. Creates and adds resources
    5. Re-initializes the schema

    Args:
        schema_name: Schema name to introspect (source-specific)

    Returns:
        Schema: Complete inferred schema with vertices, edges, and resources
    """
    # Introspect the schema
    introspection_result = self.introspect(schema_name=schema_name)

    # Infer schema
    schema = self.infer_schema(introspection_result, schema_name=schema_name)

    # Sanitize for target database flavor
    schema = self.sanitizer.sanitize(schema)

    # Create and add resources
    resources = self.create_resources(introspection_result, schema)
    schema.resources = resources

    # Re-initialize to set up resource mappings
    schema.finish_init()

    return schema

infer_schema(introspection_result, schema_name=None)

Infer graflo Schema from PostgreSQL introspection result.

Parameters:

Name Type Description Default
introspection_result

SchemaIntrospectionResult from PostgreSQL

required
schema_name str | None

Schema name (optional, may be inferred from result)

None

Returns:

Name Type Description
Schema Schema

Inferred schema with vertices and edges

Source code in graflo/hq/inferencer.py
def infer_schema(
    self, introspection_result, schema_name: str | None = None
) -> Schema:
    """Infer graflo Schema from PostgreSQL introspection result.

    Args:
        introspection_result: SchemaIntrospectionResult from PostgreSQL
        schema_name: Schema name (optional, may be inferred from result)

    Returns:
        Schema: Inferred schema with vertices and edges
    """
    return self.inferencer.infer_schema(
        introspection_result, schema_name=schema_name
    )

introspect(schema_name=None)

Introspect PostgreSQL schema.

Parameters:

Name Type Description Default
schema_name str | None

Schema name to introspect

None

Returns:

Name Type Description
SchemaIntrospectionResult SchemaIntrospectionResult

PostgreSQL schema introspection result

Source code in graflo/hq/inferencer.py
def introspect(self, schema_name: str | None = None) -> SchemaIntrospectionResult:
    """Introspect PostgreSQL schema.

    Args:
        schema_name: Schema name to introspect

    Returns:
        SchemaIntrospectionResult: PostgreSQL schema introspection result
    """
    return self.conn.introspect_schema(schema_name=schema_name)