Skip to content

graflo.hq.resource_mapper

Resource mapper for creating Patterns from different data sources.

This module provides functionality to create Patterns from various data sources (PostgreSQL, files, etc.) that can be used for graph ingestion.

ResourceMapper

Maps different data sources to Patterns for graph ingestion.

This class provides methods to create Patterns from various data sources, enabling a unified interface for pattern creation regardless of the source type.

Source code in graflo/hq/resource_mapper.py
class ResourceMapper:
    """Maps different data sources to Patterns for graph ingestion.

    This class provides methods to create Patterns from various data sources,
    enabling a unified interface for pattern creation regardless of the source type.
    """

    def create_patterns_from_postgres(
        self,
        conn: PostgresConnection,
        schema_name: str | None = None,
        datetime_columns: dict[str, str] | None = None,
    ) -> Patterns:
        """Create Patterns from PostgreSQL tables.

        Args:
            conn: PostgresConnection instance
            schema_name: Schema name to introspect
            datetime_columns: Optional mapping of resource/table name to datetime
                column name for date-range filtering (sets date_field on each
                TablePattern). Used with IngestionParams.datetime_after /
                datetime_before.

        Returns:
            Patterns: Patterns object with TablePattern instances for all tables
        """
        # Introspect the schema
        introspection_result = conn.introspect_schema(schema_name=schema_name)

        # Create patterns
        patterns = Patterns()

        # Get schema name
        effective_schema = schema_name or introspection_result.schema_name

        # Store the connection config
        config_key = "default"
        patterns.postgres_configs[(config_key, effective_schema)] = conn.config

        date_cols = datetime_columns or {}

        # Add patterns for vertex tables
        for table_info in introspection_result.vertex_tables:
            table_name = table_info.name
            table_pattern = TablePattern(
                table_name=table_name,
                schema_name=effective_schema,
                resource_name=table_name,
                date_field=date_cols.get(table_name),
            )
            patterns.table_patterns[table_name] = table_pattern
            patterns.postgres_table_configs[table_name] = (
                config_key,
                effective_schema,
                table_name,
            )

        # Add patterns for edge tables
        for table_info in introspection_result.edge_tables:
            table_name = table_info.name
            table_pattern = TablePattern(
                table_name=table_name,
                schema_name=effective_schema,
                resource_name=table_name,
                date_field=date_cols.get(table_name),
            )
            patterns.table_patterns[table_name] = table_pattern
            patterns.postgres_table_configs[table_name] = (
                config_key,
                effective_schema,
                table_name,
            )

        return patterns

create_patterns_from_postgres(conn, schema_name=None, datetime_columns=None)

Create Patterns from PostgreSQL tables.

Parameters:

Name Type Description Default
conn PostgresConnection

PostgresConnection instance

required
schema_name str | None

Schema name to introspect

None
datetime_columns dict[str, str] | None

Optional mapping of resource/table name to datetime column name for date-range filtering (sets date_field on each TablePattern). Used with IngestionParams.datetime_after / datetime_before.

None

Returns:

Name Type Description
Patterns Patterns

Patterns object with TablePattern instances for all tables

Source code in graflo/hq/resource_mapper.py
def create_patterns_from_postgres(
    self,
    conn: PostgresConnection,
    schema_name: str | None = None,
    datetime_columns: dict[str, str] | None = None,
) -> Patterns:
    """Create Patterns from PostgreSQL tables.

    Args:
        conn: PostgresConnection instance
        schema_name: Schema name to introspect
        datetime_columns: Optional mapping of resource/table name to datetime
            column name for date-range filtering (sets date_field on each
            TablePattern). Used with IngestionParams.datetime_after /
            datetime_before.

    Returns:
        Patterns: Patterns object with TablePattern instances for all tables
    """
    # Introspect the schema
    introspection_result = conn.introspect_schema(schema_name=schema_name)

    # Create patterns
    patterns = Patterns()

    # Get schema name
    effective_schema = schema_name or introspection_result.schema_name

    # Store the connection config
    config_key = "default"
    patterns.postgres_configs[(config_key, effective_schema)] = conn.config

    date_cols = datetime_columns or {}

    # Add patterns for vertex tables
    for table_info in introspection_result.vertex_tables:
        table_name = table_info.name
        table_pattern = TablePattern(
            table_name=table_name,
            schema_name=effective_schema,
            resource_name=table_name,
            date_field=date_cols.get(table_name),
        )
        patterns.table_patterns[table_name] = table_pattern
        patterns.postgres_table_configs[table_name] = (
            config_key,
            effective_schema,
            table_name,
        )

    # Add patterns for edge tables
    for table_info in introspection_result.edge_tables:
        table_name = table_info.name
        table_pattern = TablePattern(
            table_name=table_name,
            schema_name=effective_schema,
            resource_name=table_name,
            date_field=date_cols.get(table_name),
        )
        patterns.table_patterns[table_name] = table_pattern
        patterns.postgres_table_configs[table_name] = (
            config_key,
            effective_schema,
            table_name,
        )

    return patterns