Skip to content

graflo.data_source.rdf

RDF data source hierarchy.

Provides two concrete data sources that share a common abstract parent:

  • :class:RdfFileDataSource – reads local RDF files (Turtle, RDF/XML, N3, JSON-LD, …) via rdflib.
  • :class:SparqlEndpointDataSource – queries a remote SPARQL endpoint (e.g. Apache Fuseki) via SPARQLWrapper.

Both convert RDF triples into flat dictionaries grouped by subject URI, one dict per rdf:Class instance.

Uses rdflib and SPARQLWrapper, which are core dependencies of graflo (see pyproject.toml).

RdfDataSource

Bases: AbstractDataSource, ABC

Abstract base for RDF data sources (file and endpoint).

Captures the fields and batch-yielding logic shared by both :class:RdfFileDataSource and :class:SparqlEndpointDataSource.

Attributes:

Name Type Description
rdf_class str | None

Optional URI of the rdf:Class to filter subjects by.

Source code in graflo/data_source/rdf.py
class RdfDataSource(AbstractDataSource, abc.ABC):
    """Abstract base for RDF data sources (file and endpoint).

    Captures the fields and batch-yielding logic shared by both
    :class:`RdfFileDataSource` and :class:`SparqlEndpointDataSource`.

    Attributes:
        rdf_class: Optional URI of the ``rdf:Class`` to filter subjects by.
    """

    source_type: DataSourceType = DataSourceType.SPARQL
    rdf_class: str | None = Field(
        default=None, description="URI of the rdf:Class to filter by"
    )

    @staticmethod
    def _yield_batches(
        docs: list[dict], batch_size: int, limit: int | None
    ) -> Iterator[list[dict]]:
        """Apply *limit*, then yield *docs* in chunks of *batch_size*."""
        if limit is not None:
            docs = docs[:limit]
        for i in range(0, max(len(docs), 1), batch_size):
            batch = docs[i : i + batch_size]
            if batch:
                yield batch

RdfFileDataSource

Bases: RdfDataSource

Data source for local RDF files.

Parses RDF files using rdflib and yields flat dictionaries grouped by subject URI. Optionally filters by rdf_class so that only instances of a specific class are returned.

Attributes:

Name Type Description
path Path

Path to the RDF file.

rdf_format str | None

Explicit rdflib format string (e.g. "turtle"). When None the format is guessed from the file extension.

Source code in graflo/data_source/rdf.py
class RdfFileDataSource(RdfDataSource):
    """Data source for local RDF files.

    Parses RDF files using *rdflib* and yields flat dictionaries grouped by
    subject URI.  Optionally filters by ``rdf_class`` so that only instances
    of a specific class are returned.

    Attributes:
        path: Path to the RDF file.
        rdf_format: Explicit rdflib format string (e.g. ``"turtle"``).
            When ``None`` the format is guessed from the file extension.
    """

    path: Path
    rdf_format: str | None = Field(
        default=None, description="rdflib serialization format"
    )

    def _resolve_format(self) -> str:
        """Return the rdflib format string, guessing from extension if needed."""
        if self.rdf_format:
            return self.rdf_format
        ext = self.path.suffix.lower()
        fmt = _EXT_FORMAT.get(ext)
        if fmt is None:
            raise ValueError(
                f"Cannot determine RDF format for extension '{ext}'. "
                f"Set rdf_format explicitly. Known: {list(_EXT_FORMAT.keys())}"
            )
        return fmt

    def iter_batches(
        self, batch_size: int = 1000, limit: int | None = None
    ) -> Iterator[list[dict]]:
        """Parse the RDF file and yield batches of flat dictionaries."""
        try:
            from rdflib import Graph
        except ImportError as exc:
            raise ImportError(
                "rdflib is required for RDF data sources. "
                "It is a core dependency of graflo; reinstall with "
                "`pip install --force-reinstall graflo` or install rdflib manually."
            ) from exc

        g = Graph()
        g.parse(str(self.path), format=self._resolve_format())
        logger.info(
            "Parsed %d triples from %s (format=%s)",
            len(g),
            self.path,
            self._resolve_format(),
        )

        docs = _triples_to_docs(g, rdf_class=self.rdf_class)
        yield from self._yield_batches(docs, batch_size, limit)

iter_batches(batch_size=1000, limit=None)

Parse the RDF file and yield batches of flat dictionaries.

Source code in graflo/data_source/rdf.py
def iter_batches(
    self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
    """Parse the RDF file and yield batches of flat dictionaries."""
    try:
        from rdflib import Graph
    except ImportError as exc:
        raise ImportError(
            "rdflib is required for RDF data sources. "
            "It is a core dependency of graflo; reinstall with "
            "`pip install --force-reinstall graflo` or install rdflib manually."
        ) from exc

    g = Graph()
    g.parse(str(self.path), format=self._resolve_format())
    logger.info(
        "Parsed %d triples from %s (format=%s)",
        len(g),
        self.path,
        self._resolve_format(),
    )

    docs = _triples_to_docs(g, rdf_class=self.rdf_class)
    yield from self._yield_batches(docs, batch_size, limit)

SparqlEndpointDataSource

Bases: RdfDataSource

Data source that reads from a SPARQL endpoint.

Uses SPARQLWrapper to query an endpoint and returns flat dictionaries grouped by subject.

Attributes:

Name Type Description
config SparqlSourceConfig

SPARQL source configuration.

Source code in graflo/data_source/rdf.py
class SparqlEndpointDataSource(RdfDataSource):
    """Data source that reads from a SPARQL endpoint.

    Uses ``SPARQLWrapper`` to query an endpoint and returns flat dictionaries
    grouped by subject.

    Attributes:
        config: SPARQL source configuration.
    """

    config: SparqlSourceConfig

    def _create_wrapper(self) -> Any:
        """Create a configured ``SPARQLWrapper`` instance."""
        try:
            from SPARQLWrapper import JSON, SPARQLWrapper
        except ImportError as exc:
            raise ImportError(
                "SPARQLWrapper is required for SPARQL endpoint data sources. "
                "It is a core dependency of graflo; reinstall with "
                "`pip install --force-reinstall graflo` or install SPARQLWrapper manually."
            ) from exc

        sparql = SPARQLWrapper(self.config.endpoint_url)
        sparql.setReturnFormat(JSON)
        if self.config.username and self.config.password:
            sparql.setCredentials(self.config.username, self.config.password)
        return sparql

    def iter_batches(
        self, batch_size: int = 1000, limit: int | None = None
    ) -> Iterator[list[dict]]:
        """Query the SPARQL endpoint and yield batches of flat dictionaries.

        Paginates using SPARQL LIMIT/OFFSET.
        """
        wrapper = self._create_wrapper()
        offset = 0
        all_docs: list[dict] = []

        while True:
            query = self.config.build_query(offset=offset, limit=self.config.page_size)
            wrapper.setQuery(query)

            logger.debug("SPARQL query (offset=%d): %s", offset, query)
            results = wrapper.queryAndConvert()

            bindings = results.get("results", {}).get("bindings", [])
            if not bindings:
                break

            page_docs = _sparql_results_to_docs(results)
            all_docs.extend(page_docs)

            if len(bindings) < self.config.page_size:
                break

            offset += self.config.page_size

        yield from self._yield_batches(all_docs, batch_size, limit)

iter_batches(batch_size=1000, limit=None)

Query the SPARQL endpoint and yield batches of flat dictionaries.

Paginates using SPARQL LIMIT/OFFSET.

Source code in graflo/data_source/rdf.py
def iter_batches(
    self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
    """Query the SPARQL endpoint and yield batches of flat dictionaries.

    Paginates using SPARQL LIMIT/OFFSET.
    """
    wrapper = self._create_wrapper()
    offset = 0
    all_docs: list[dict] = []

    while True:
        query = self.config.build_query(offset=offset, limit=self.config.page_size)
        wrapper.setQuery(query)

        logger.debug("SPARQL query (offset=%d): %s", offset, query)
        results = wrapper.queryAndConvert()

        bindings = results.get("results", {}).get("bindings", [])
        if not bindings:
            break

        page_docs = _sparql_results_to_docs(results)
        all_docs.extend(page_docs)

        if len(bindings) < self.config.page_size:
            break

        offset += self.config.page_size

    yield from self._yield_batches(all_docs, batch_size, limit)

SparqlSourceConfig

Bases: ConfigBaseModel

Configuration for a SPARQL endpoint data source.

Attributes:

Name Type Description
endpoint_url str

Full SPARQL query endpoint URL (e.g. http://localhost:3030/dataset/sparql)

rdf_class str | None

URI of the rdf:Class whose instances to fetch

graph_uri str | None

Named graph to restrict the query to (optional)

sparql_query str | None

Custom SPARQL query override (optional)

username str | None

HTTP basic-auth username (optional)

password str | None

HTTP basic-auth password (optional)

page_size int

Number of results per SPARQL LIMIT/OFFSET page

Source code in graflo/data_source/rdf.py
class SparqlSourceConfig(ConfigBaseModel):
    """Configuration for a SPARQL endpoint data source.

    Attributes:
        endpoint_url: Full SPARQL query endpoint URL
            (e.g. ``http://localhost:3030/dataset/sparql``)
        rdf_class: URI of the rdf:Class whose instances to fetch
        graph_uri: Named graph to restrict the query to (optional)
        sparql_query: Custom SPARQL query override (optional)
        username: HTTP basic-auth username (optional)
        password: HTTP basic-auth password (optional)
        page_size: Number of results per SPARQL LIMIT/OFFSET page
    """

    endpoint_url: str
    rdf_class: str | None = None
    graph_uri: str | None = None
    sparql_query: str | None = None
    username: str | None = None
    password: str | None = None
    page_size: int = Field(default=10_000, description="SPARQL pagination page size")

    def build_query(self, offset: int = 0, limit: int | None = None) -> str:
        """Build a SPARQL SELECT query.

        If *sparql_query* is set it is returned with LIMIT/OFFSET appended.
        Otherwise generates::

            SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
        """
        if self.sparql_query:
            base = self.sparql_query.rstrip().rstrip(";")
        else:
            graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
            graph_close = "}" if self.graph_uri else ""
            class_filter = f"?s a <{self.rdf_class}> . " if self.rdf_class else ""
            base = (
                f"SELECT ?s ?p ?o WHERE {{ "
                f"{graph_open} "
                f"{class_filter}"
                f"?s ?p ?o . "
                f"{graph_close} "
                f"}}"
            )

        effective_limit = limit if limit is not None else self.page_size
        return f"{base} LIMIT {effective_limit} OFFSET {offset}"

build_query(offset=0, limit=None)

Build a SPARQL SELECT query.

If sparql_query is set it is returned with LIMIT/OFFSET appended. Otherwise generates::

SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
Source code in graflo/data_source/rdf.py
def build_query(self, offset: int = 0, limit: int | None = None) -> str:
    """Build a SPARQL SELECT query.

    If *sparql_query* is set it is returned with LIMIT/OFFSET appended.
    Otherwise generates::

        SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
    """
    if self.sparql_query:
        base = self.sparql_query.rstrip().rstrip(";")
    else:
        graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
        graph_close = "}" if self.graph_uri else ""
        class_filter = f"?s a <{self.rdf_class}> . " if self.rdf_class else ""
        base = (
            f"SELECT ?s ?p ?o WHERE {{ "
            f"{graph_open} "
            f"{class_filter}"
            f"?s ?p ?o . "
            f"{graph_close} "
            f"}}"
        )

    effective_limit = limit if limit is not None else self.page_size
    return f"{base} LIMIT {effective_limit} OFFSET {offset}"