RDF data source hierarchy.
Provides two concrete data sources that share a common abstract parent:
- :class:
RdfFileDataSource – reads local RDF files (Turtle, RDF/XML, N3,
JSON-LD, …) via rdflib.
- :class:
SparqlEndpointDataSource – queries a remote SPARQL endpoint
(e.g. Apache Fuseki) via SPARQLWrapper.
Both convert RDF triples into flat dictionaries grouped by subject URI, one
dict per rdf:Class instance.
Uses rdflib and SPARQLWrapper, which are core dependencies of
graflo (see pyproject.toml).
RdfDataSource
Bases: AbstractDataSource, ABC
Abstract base for RDF data sources (file and endpoint).
Captures the fields and batch-yielding logic shared by both
:class:RdfFileDataSource and :class:SparqlEndpointDataSource.
Attributes:
| Name |
Type |
Description |
rdf_class |
str | None
|
Optional URI of the rdf:Class to filter subjects by.
|
Source code in graflo/data_source/rdf.py
| class RdfDataSource(AbstractDataSource, abc.ABC):
"""Abstract base for RDF data sources (file and endpoint).
Captures the fields and batch-yielding logic shared by both
:class:`RdfFileDataSource` and :class:`SparqlEndpointDataSource`.
Attributes:
rdf_class: Optional URI of the ``rdf:Class`` to filter subjects by.
"""
source_type: DataSourceType = DataSourceType.SPARQL
rdf_class: str | None = Field(
default=None, description="URI of the rdf:Class to filter by"
)
@staticmethod
def _yield_batches(
docs: list[dict], batch_size: int, limit: int | None
) -> Iterator[list[dict]]:
"""Apply *limit*, then yield *docs* in chunks of *batch_size*."""
if limit is not None:
docs = docs[:limit]
for i in range(0, max(len(docs), 1), batch_size):
batch = docs[i : i + batch_size]
if batch:
yield batch
|
RdfFileDataSource
Bases: RdfDataSource
Data source for local RDF files.
Parses RDF files using rdflib and yields flat dictionaries grouped by
subject URI. Optionally filters by rdf_class so that only instances
of a specific class are returned.
Attributes:
| Name |
Type |
Description |
path |
Path
|
|
rdf_format |
str | None
|
Explicit rdflib format string (e.g. "turtle").
When None the format is guessed from the file extension.
|
Source code in graflo/data_source/rdf.py
| class RdfFileDataSource(RdfDataSource):
"""Data source for local RDF files.
Parses RDF files using *rdflib* and yields flat dictionaries grouped by
subject URI. Optionally filters by ``rdf_class`` so that only instances
of a specific class are returned.
Attributes:
path: Path to the RDF file.
rdf_format: Explicit rdflib format string (e.g. ``"turtle"``).
When ``None`` the format is guessed from the file extension.
"""
path: Path
rdf_format: str | None = Field(
default=None, description="rdflib serialization format"
)
def _resolve_format(self) -> str:
"""Return the rdflib format string, guessing from extension if needed."""
if self.rdf_format:
return self.rdf_format
ext = self.path.suffix.lower()
fmt = _EXT_FORMAT.get(ext)
if fmt is None:
raise ValueError(
f"Cannot determine RDF format for extension '{ext}'. "
f"Set rdf_format explicitly. Known: {list(_EXT_FORMAT.keys())}"
)
return fmt
def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Parse the RDF file and yield batches of flat dictionaries."""
try:
from rdflib import Graph
except ImportError as exc:
raise ImportError(
"rdflib is required for RDF data sources. "
"It is a core dependency of graflo; reinstall with "
"`pip install --force-reinstall graflo` or install rdflib manually."
) from exc
g = Graph()
g.parse(str(self.path), format=self._resolve_format())
logger.info(
"Parsed %d triples from %s (format=%s)",
len(g),
self.path,
self._resolve_format(),
)
docs = _triples_to_docs(g, rdf_class=self.rdf_class)
yield from self._yield_batches(docs, batch_size, limit)
|
iter_batches(batch_size=1000, limit=None)
Parse the RDF file and yield batches of flat dictionaries.
Source code in graflo/data_source/rdf.py
| def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Parse the RDF file and yield batches of flat dictionaries."""
try:
from rdflib import Graph
except ImportError as exc:
raise ImportError(
"rdflib is required for RDF data sources. "
"It is a core dependency of graflo; reinstall with "
"`pip install --force-reinstall graflo` or install rdflib manually."
) from exc
g = Graph()
g.parse(str(self.path), format=self._resolve_format())
logger.info(
"Parsed %d triples from %s (format=%s)",
len(g),
self.path,
self._resolve_format(),
)
docs = _triples_to_docs(g, rdf_class=self.rdf_class)
yield from self._yield_batches(docs, batch_size, limit)
|
SparqlEndpointDataSource
Bases: RdfDataSource
Data source that reads from a SPARQL endpoint.
Uses SPARQLWrapper to query an endpoint and returns flat dictionaries
grouped by subject.
Attributes:
Source code in graflo/data_source/rdf.py
| class SparqlEndpointDataSource(RdfDataSource):
"""Data source that reads from a SPARQL endpoint.
Uses ``SPARQLWrapper`` to query an endpoint and returns flat dictionaries
grouped by subject.
Attributes:
config: SPARQL source configuration.
"""
config: SparqlSourceConfig
def _create_wrapper(self) -> Any:
"""Create a configured ``SPARQLWrapper`` instance."""
try:
from SPARQLWrapper import JSON, SPARQLWrapper
except ImportError as exc:
raise ImportError(
"SPARQLWrapper is required for SPARQL endpoint data sources. "
"It is a core dependency of graflo; reinstall with "
"`pip install --force-reinstall graflo` or install SPARQLWrapper manually."
) from exc
sparql = SPARQLWrapper(self.config.endpoint_url)
sparql.setReturnFormat(JSON)
if self.config.username and self.config.password:
sparql.setCredentials(self.config.username, self.config.password)
return sparql
def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Query the SPARQL endpoint and yield batches of flat dictionaries.
Paginates using SPARQL LIMIT/OFFSET.
"""
wrapper = self._create_wrapper()
offset = 0
all_docs: list[dict] = []
while True:
query = self.config.build_query(offset=offset, limit=self.config.page_size)
wrapper.setQuery(query)
logger.debug("SPARQL query (offset=%d): %s", offset, query)
results = wrapper.queryAndConvert()
bindings = results.get("results", {}).get("bindings", [])
if not bindings:
break
page_docs = _sparql_results_to_docs(results)
all_docs.extend(page_docs)
if len(bindings) < self.config.page_size:
break
offset += self.config.page_size
yield from self._yield_batches(all_docs, batch_size, limit)
|
iter_batches(batch_size=1000, limit=None)
Query the SPARQL endpoint and yield batches of flat dictionaries.
Paginates using SPARQL LIMIT/OFFSET.
Source code in graflo/data_source/rdf.py
| def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Query the SPARQL endpoint and yield batches of flat dictionaries.
Paginates using SPARQL LIMIT/OFFSET.
"""
wrapper = self._create_wrapper()
offset = 0
all_docs: list[dict] = []
while True:
query = self.config.build_query(offset=offset, limit=self.config.page_size)
wrapper.setQuery(query)
logger.debug("SPARQL query (offset=%d): %s", offset, query)
results = wrapper.queryAndConvert()
bindings = results.get("results", {}).get("bindings", [])
if not bindings:
break
page_docs = _sparql_results_to_docs(results)
all_docs.extend(page_docs)
if len(bindings) < self.config.page_size:
break
offset += self.config.page_size
yield from self._yield_batches(all_docs, batch_size, limit)
|
SparqlSourceConfig
Bases: ConfigBaseModel
Configuration for a SPARQL endpoint data source.
Attributes:
| Name |
Type |
Description |
endpoint_url |
str
|
Full SPARQL query endpoint URL
(e.g. http://localhost:3030/dataset/sparql)
|
rdf_class |
str | None
|
URI of the rdf:Class whose instances to fetch
|
graph_uri |
str | None
|
Named graph to restrict the query to (optional)
|
sparql_query |
str | None
|
Custom SPARQL query override (optional)
|
username |
str | None
|
HTTP basic-auth username (optional)
|
password |
str | None
|
HTTP basic-auth password (optional)
|
page_size |
int
|
Number of results per SPARQL LIMIT/OFFSET page
|
Source code in graflo/data_source/rdf.py
| class SparqlSourceConfig(ConfigBaseModel):
"""Configuration for a SPARQL endpoint data source.
Attributes:
endpoint_url: Full SPARQL query endpoint URL
(e.g. ``http://localhost:3030/dataset/sparql``)
rdf_class: URI of the rdf:Class whose instances to fetch
graph_uri: Named graph to restrict the query to (optional)
sparql_query: Custom SPARQL query override (optional)
username: HTTP basic-auth username (optional)
password: HTTP basic-auth password (optional)
page_size: Number of results per SPARQL LIMIT/OFFSET page
"""
endpoint_url: str
rdf_class: str | None = None
graph_uri: str | None = None
sparql_query: str | None = None
username: str | None = None
password: str | None = None
page_size: int = Field(default=10_000, description="SPARQL pagination page size")
def build_query(self, offset: int = 0, limit: int | None = None) -> str:
"""Build a SPARQL SELECT query.
If *sparql_query* is set it is returned with LIMIT/OFFSET appended.
Otherwise generates::
SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
"""
if self.sparql_query:
base = self.sparql_query.rstrip().rstrip(";")
else:
graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
graph_close = "}" if self.graph_uri else ""
class_filter = f"?s a <{self.rdf_class}> . " if self.rdf_class else ""
base = (
f"SELECT ?s ?p ?o WHERE {{ "
f"{graph_open} "
f"{class_filter}"
f"?s ?p ?o . "
f"{graph_close} "
f"}}"
)
effective_limit = limit if limit is not None else self.page_size
return f"{base} LIMIT {effective_limit} OFFSET {offset}"
|
build_query(offset=0, limit=None)
Build a SPARQL SELECT query.
If sparql_query is set it is returned with LIMIT/OFFSET appended.
Otherwise generates::
SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
Source code in graflo/data_source/rdf.py
| def build_query(self, offset: int = 0, limit: int | None = None) -> str:
"""Build a SPARQL SELECT query.
If *sparql_query* is set it is returned with LIMIT/OFFSET appended.
Otherwise generates::
SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
"""
if self.sparql_query:
base = self.sparql_query.rstrip().rstrip(";")
else:
graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
graph_close = "}" if self.graph_uri else ""
class_filter = f"?s a <{self.rdf_class}> . " if self.rdf_class else ""
base = (
f"SELECT ?s ?p ?o WHERE {{ "
f"{graph_open} "
f"{class_filter}"
f"?s ?p ?o . "
f"{graph_close} "
f"}}"
)
effective_limit = limit if limit is not None else self.page_size
return f"{base} LIMIT {effective_limit} OFFSET {offset}"
|