Skip to content

graflo.hq.auto_join

Auto-JOIN generation for edge resources.

When a Resource's pipeline contains an EdgeActor whose edge has match_source / match_target, and the source/target vertex types have known table connectors, this module can auto-generate JoinClauses and IS_NOT_NULL filters on the edge resource's table connector so that the resulting SQL fetches fully resolved rows.

enrich_edge_connector_with_joins(resource, connector, bindings, vertex_config)

Mutate connector in-place, adding JoinClauses + IS_NOT_NULL filters.

The function inspects the Resource's actor pipeline for EdgeActors and, for each edge that declares match_source and match_target, looks up the source / target vertex table connectors and primary keys to construct LEFT JOINs and NOT-NULL guards.

If the connector already has joins, this function is a no-op (the user provided explicit join specs).

Parameters:

Name Type Description Default
resource Resource

The Resource whose pipeline is inspected.

required
connector TableConnector

The table connector to enrich (mutated in-place).

required
bindings Bindings

The Bindings collection holding all vertex table connectors.

required
vertex_config VertexConfig

VertexConfig for looking up primary keys.

required
Source code in graflo/hq/auto_join.py
def enrich_edge_connector_with_joins(
    resource: Resource,
    connector: TableConnector,
    bindings: Bindings,
    vertex_config: VertexConfig,
) -> None:
    """Mutate *connector* in-place, adding JoinClauses + IS_NOT_NULL filters.

    The function inspects the Resource's actor pipeline for EdgeActors and,
    for each edge that declares ``match_source`` **and** ``match_target``,
    looks up the source / target vertex table connectors and primary keys to
    construct LEFT JOINs and NOT-NULL guards.

    If the connector already has joins, this function is a no-op (the user
    provided explicit join specs).

    Args:
        resource: The Resource whose pipeline is inspected.
        connector: The table connector to enrich (mutated in-place).
        bindings: The Bindings collection holding all vertex table connectors.
        vertex_config: VertexConfig for looking up primary keys.
    """
    if connector.joins:
        return

    edge_actors = _collect_edge_actors(resource.root)
    if not edge_actors:
        return

    new_joins: list[JoinClause] = []
    new_filters: list[FilterExpression] = []

    for ea in edge_actors:
        edge = ea.edge
        if not edge.match_source or not edge.match_target:
            continue

        source_info = _vertex_table_info(edge.source, bindings, vertex_config)
        target_info = _vertex_table_info(edge.target, bindings, vertex_config)
        if source_info is None or target_info is None:
            logger.debug(
                "Skipping auto-join for edge %s->%s: missing vertex connector",
                edge.source,
                edge.target,
            )
            continue

        src_table, src_schema, src_pk = source_info
        tgt_table, tgt_schema, tgt_pk = target_info

        src_alias = _SOURCE_ALIAS
        tgt_alias = _TARGET_ALIAS

        new_joins.append(
            JoinClause(
                table=src_table,
                schema_name=src_schema,
                alias=src_alias,
                on_self=edge.match_source,
                on_other=src_pk,
                join_type="LEFT",
            )
        )
        new_joins.append(
            JoinClause(
                table=tgt_table,
                schema_name=tgt_schema,
                alias=tgt_alias,
                on_self=edge.match_target,
                on_other=tgt_pk,
                join_type="LEFT",
            )
        )

        new_filters.append(
            FilterExpression(
                kind="leaf",
                field=f"{src_alias}.{src_pk}",
                cmp_operator=ComparisonOperator.IS_NOT_NULL,
            )
        )
        new_filters.append(
            FilterExpression(
                kind="leaf",
                field=f"{tgt_alias}.{tgt_pk}",
                cmp_operator=ComparisonOperator.IS_NOT_NULL,
            )
        )

    if new_joins:
        connector.joins = new_joins
        connector.filters = list(connector.filters) + new_filters