Skip to content

graflo.hq.rdf_inferencer

RDF / OWL ontology inference manager.

Reads the TBox (class & property definitions) from an RDF source and produces a graflo :class:Schema with vertices, edges, resources, and :class:Bindings.

The mapping follows these conventions:

  • owl:Class / rdfs:Class -> Vertex
  • owl:DatatypeProperty (rdfs:domain) -> Field on the domain vertex
  • owl:ObjectProperty (rdfs:domain, rdfs:range) -> Edge (source = domain class, target = range class)
  • Subject URI local name -> _key

Requires rdflib (a core dependency of graflo).

RdfInferenceManager

Infer a graflo :class:Schema from an RDF / OWL ontology.

The manager reads the TBox (class and property declarations) from an rdflib Graph and constructs the corresponding graflo artefacts.

Attributes:

Name Type Description
target_db_flavor

Target graph-database flavour for downstream schema sanitisation.

Source code in graflo/hq/rdf_inferencer.py
class RdfInferenceManager:
    """Infer a graflo :class:`Schema` from an RDF / OWL ontology.

    The manager reads the TBox (class and property declarations) from an
    rdflib ``Graph`` and constructs the corresponding graflo artefacts.

    Attributes:
        target_db_flavor: Target graph-database flavour for downstream
            schema sanitisation.
    """

    def __init__(self, target_db_flavor: DBType = DBType.ARANGO):
        self.target_db_flavor = target_db_flavor

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def infer_schema(
        self,
        source: str | Path,
        *,
        endpoint_url: str | None = None,
        graph_uri: str | None = None,
        schema_name: str | None = None,
    ) -> tuple[Schema, IngestionModel]:
        """Infer a complete graflo Schema from an RDF/OWL ontology.

        Args:
            source: Path to an RDF file or a base URL (when using endpoint).
            endpoint_url: SPARQL endpoint to CONSTRUCT the ontology from.
            graph_uri: Named graph containing the ontology.
            schema_name: Name for the resulting schema.

        Returns:
            tuple[Schema, IngestionModel]: fully initialised schema and ingestion model.
        """
        from rdflib import OWL, RDF, RDFS

        g = _load_graph(source, endpoint_url=endpoint_url, graph_uri=graph_uri)

        # -- Discover classes -------------------------------------------------
        classes: dict[str, str] = {}  # local_name -> full URI
        for cls_uri in set(g.subjects(RDF.type, OWL.Class)) | set(
            g.subjects(RDF.type, RDFS.Class)
        ):
            uri_str = str(cls_uri)
            name = _local_name(uri_str)
            if (
                name
                and not uri_str.startswith(str(OWL))
                and not uri_str.startswith(str(RDFS))
            ):
                classes[name] = uri_str

        logger.info("Discovered %d classes: %s", len(classes), list(classes.keys()))

        # -- Discover datatype properties -> vertex fields --------------------
        fields_by_class: dict[str, list[str]] = {c: ["_key", "_uri"] for c in classes}

        for dp in g.subjects(RDF.type, OWL.DatatypeProperty):
            dp_name = _local_name(str(dp))
            for domain in g.objects(dp, RDFS.domain):
                domain_name = _local_name(str(domain))
                if domain_name in fields_by_class:
                    fields_by_class[domain_name].append(dp_name)

        # -- Discover object properties -> edges ------------------------------
        edges: list[dict[str, str]] = []
        for op in g.subjects(RDF.type, OWL.ObjectProperty):
            op_name = _local_name(str(op))
            domains = [_local_name(str(d)) for d in g.objects(op, RDFS.domain)]
            ranges = [_local_name(str(r)) for r in g.objects(op, RDFS.range)]

            for src in domains:
                for tgt in ranges:
                    if src in classes and tgt in classes:
                        edges.append(
                            {"source": src, "target": tgt, "relation": op_name}
                        )

        logger.info("Discovered %d edges", len(edges))

        # -- Build Schema artefacts -------------------------------------------
        vertices = []
        for cls_name, fields in fields_by_class.items():
            vertex_fields = [VertexField(name=f) for f in fields]
            vertices.append(Vertex(name=cls_name, fields=vertex_fields))

        vertex_config = VertexConfig(vertices=vertices)

        edge_objects = [
            Edge(
                source=e["source"],
                target=e["target"],
                relation=e.get("relation"),
            )
            for e in edges
        ]
        edge_config = EdgeConfig(edges=edge_objects)

        # -- Build Resources (one per class) ----------------------------------
        resources: list[Resource] = []
        for cls_name in classes:
            pipeline: list[dict[str, Any]] = [{"vertex": cls_name}]
            for edge_def in edges:
                if edge_def["source"] == cls_name:
                    pipeline.append(
                        {
                            "source": edge_def["source"],
                            "target": edge_def["target"],
                            "relation": edge_def.get("relation"),
                        }
                    )
            resources.append(Resource(name=cls_name, pipeline=pipeline))

        effective_name = schema_name or "rdf_schema"
        schema = Schema(
            metadata=GraphMetadata(name=effective_name),
            core_schema=CoreSchema(
                vertex_config=vertex_config, edge_config=edge_config
            ),
            db_profile=DatabaseProfile(db_flavor=self.target_db_flavor),
        )
        ingestion_model = IngestionModel(resources=resources)
        ingestion_model.finish_init(schema.core_schema)
        return schema, ingestion_model

    def create_bindings(
        self,
        source: str | Path,
        *,
        endpoint_url: str | None = None,
        graph_uri: str | None = None,
    ) -> Bindings:
        """Create :class:`Bindings` from an RDF ontology.

        One :class:`SparqlConnector` is created per ``owl:Class`` / ``rdfs:Class``.
        The ontology is always loaded from *source* (a local file).  The
        *endpoint_url* is attached to each connector for runtime data queries
        but is **not** used to load the ontology itself.

        Args:
            source: Path to an RDF file containing the ontology.
            endpoint_url: SPARQL endpoint for the data (ABox) at runtime.
            graph_uri: Named graph containing the data.

        Returns:
            Bindings with one SparqlConnector per class.
        """
        from rdflib import OWL, RDF, RDFS

        # Always load the ontology from the local file, not from the endpoint.
        g = _load_graph(source)

        classes: dict[str, str] = {}
        for cls_uri in set(g.subjects(RDF.type, OWL.Class)) | set(
            g.subjects(RDF.type, RDFS.Class)
        ):
            uri_str = str(cls_uri)
            name = _local_name(uri_str)
            if (
                name
                and not uri_str.startswith(str(OWL))
                and not uri_str.startswith(str(RDFS))
            ):
                classes[name] = uri_str

        bindings = Bindings()
        for cls_name, cls_uri in classes.items():
            connector = SparqlConnector(
                rdf_class=cls_uri,
                endpoint_url=endpoint_url,
                graph_uri=graph_uri,
                rdf_file=Path(source) if not endpoint_url else None,
            )
            bindings.add_connector(connector)
            bindings.bind_resource(cls_name, connector)

        logger.info(
            "Created %d SPARQL connectors from ontology",
            len(classes),
        )
        return bindings

create_bindings(source, *, endpoint_url=None, graph_uri=None)

Create :class:Bindings from an RDF ontology.

One :class:SparqlConnector is created per owl:Class / rdfs:Class. The ontology is always loaded from source (a local file). The endpoint_url is attached to each connector for runtime data queries but is not used to load the ontology itself.

Parameters:

Name Type Description Default
source str | Path

Path to an RDF file containing the ontology.

required
endpoint_url str | None

SPARQL endpoint for the data (ABox) at runtime.

None
graph_uri str | None

Named graph containing the data.

None

Returns:

Type Description
Bindings

Bindings with one SparqlConnector per class.

Source code in graflo/hq/rdf_inferencer.py
def create_bindings(
    self,
    source: str | Path,
    *,
    endpoint_url: str | None = None,
    graph_uri: str | None = None,
) -> Bindings:
    """Create :class:`Bindings` from an RDF ontology.

    One :class:`SparqlConnector` is created per ``owl:Class`` / ``rdfs:Class``.
    The ontology is always loaded from *source* (a local file).  The
    *endpoint_url* is attached to each connector for runtime data queries
    but is **not** used to load the ontology itself.

    Args:
        source: Path to an RDF file containing the ontology.
        endpoint_url: SPARQL endpoint for the data (ABox) at runtime.
        graph_uri: Named graph containing the data.

    Returns:
        Bindings with one SparqlConnector per class.
    """
    from rdflib import OWL, RDF, RDFS

    # Always load the ontology from the local file, not from the endpoint.
    g = _load_graph(source)

    classes: dict[str, str] = {}
    for cls_uri in set(g.subjects(RDF.type, OWL.Class)) | set(
        g.subjects(RDF.type, RDFS.Class)
    ):
        uri_str = str(cls_uri)
        name = _local_name(uri_str)
        if (
            name
            and not uri_str.startswith(str(OWL))
            and not uri_str.startswith(str(RDFS))
        ):
            classes[name] = uri_str

    bindings = Bindings()
    for cls_name, cls_uri in classes.items():
        connector = SparqlConnector(
            rdf_class=cls_uri,
            endpoint_url=endpoint_url,
            graph_uri=graph_uri,
            rdf_file=Path(source) if not endpoint_url else None,
        )
        bindings.add_connector(connector)
        bindings.bind_resource(cls_name, connector)

    logger.info(
        "Created %d SPARQL connectors from ontology",
        len(classes),
    )
    return bindings

infer_schema(source, *, endpoint_url=None, graph_uri=None, schema_name=None)

Infer a complete graflo Schema from an RDF/OWL ontology.

Parameters:

Name Type Description Default
source str | Path

Path to an RDF file or a base URL (when using endpoint).

required
endpoint_url str | None

SPARQL endpoint to CONSTRUCT the ontology from.

None
graph_uri str | None

Named graph containing the ontology.

None
schema_name str | None

Name for the resulting schema.

None

Returns:

Type Description
tuple[Schema, IngestionModel]

tuple[Schema, IngestionModel]: fully initialised schema and ingestion model.

Source code in graflo/hq/rdf_inferencer.py
def infer_schema(
    self,
    source: str | Path,
    *,
    endpoint_url: str | None = None,
    graph_uri: str | None = None,
    schema_name: str | None = None,
) -> tuple[Schema, IngestionModel]:
    """Infer a complete graflo Schema from an RDF/OWL ontology.

    Args:
        source: Path to an RDF file or a base URL (when using endpoint).
        endpoint_url: SPARQL endpoint to CONSTRUCT the ontology from.
        graph_uri: Named graph containing the ontology.
        schema_name: Name for the resulting schema.

    Returns:
        tuple[Schema, IngestionModel]: fully initialised schema and ingestion model.
    """
    from rdflib import OWL, RDF, RDFS

    g = _load_graph(source, endpoint_url=endpoint_url, graph_uri=graph_uri)

    # -- Discover classes -------------------------------------------------
    classes: dict[str, str] = {}  # local_name -> full URI
    for cls_uri in set(g.subjects(RDF.type, OWL.Class)) | set(
        g.subjects(RDF.type, RDFS.Class)
    ):
        uri_str = str(cls_uri)
        name = _local_name(uri_str)
        if (
            name
            and not uri_str.startswith(str(OWL))
            and not uri_str.startswith(str(RDFS))
        ):
            classes[name] = uri_str

    logger.info("Discovered %d classes: %s", len(classes), list(classes.keys()))

    # -- Discover datatype properties -> vertex fields --------------------
    fields_by_class: dict[str, list[str]] = {c: ["_key", "_uri"] for c in classes}

    for dp in g.subjects(RDF.type, OWL.DatatypeProperty):
        dp_name = _local_name(str(dp))
        for domain in g.objects(dp, RDFS.domain):
            domain_name = _local_name(str(domain))
            if domain_name in fields_by_class:
                fields_by_class[domain_name].append(dp_name)

    # -- Discover object properties -> edges ------------------------------
    edges: list[dict[str, str]] = []
    for op in g.subjects(RDF.type, OWL.ObjectProperty):
        op_name = _local_name(str(op))
        domains = [_local_name(str(d)) for d in g.objects(op, RDFS.domain)]
        ranges = [_local_name(str(r)) for r in g.objects(op, RDFS.range)]

        for src in domains:
            for tgt in ranges:
                if src in classes and tgt in classes:
                    edges.append(
                        {"source": src, "target": tgt, "relation": op_name}
                    )

    logger.info("Discovered %d edges", len(edges))

    # -- Build Schema artefacts -------------------------------------------
    vertices = []
    for cls_name, fields in fields_by_class.items():
        vertex_fields = [VertexField(name=f) for f in fields]
        vertices.append(Vertex(name=cls_name, fields=vertex_fields))

    vertex_config = VertexConfig(vertices=vertices)

    edge_objects = [
        Edge(
            source=e["source"],
            target=e["target"],
            relation=e.get("relation"),
        )
        for e in edges
    ]
    edge_config = EdgeConfig(edges=edge_objects)

    # -- Build Resources (one per class) ----------------------------------
    resources: list[Resource] = []
    for cls_name in classes:
        pipeline: list[dict[str, Any]] = [{"vertex": cls_name}]
        for edge_def in edges:
            if edge_def["source"] == cls_name:
                pipeline.append(
                    {
                        "source": edge_def["source"],
                        "target": edge_def["target"],
                        "relation": edge_def.get("relation"),
                    }
                )
        resources.append(Resource(name=cls_name, pipeline=pipeline))

    effective_name = schema_name or "rdf_schema"
    schema = Schema(
        metadata=GraphMetadata(name=effective_name),
        core_schema=CoreSchema(
            vertex_config=vertex_config, edge_config=edge_config
        ),
        db_profile=DatabaseProfile(db_flavor=self.target_db_flavor),
    )
    ingestion_model = IngestionModel(resources=resources)
    ingestion_model.finish_init(schema.core_schema)
    return schema, ingestion_model