RDF data source hierarchy.
Provides two concrete data sources that share a common abstract parent:
- :class:
RdfFileDataSource – reads local RDF files (Turtle, RDF/XML, N3,
JSON-LD, …) via rdflib.
- :class:
SparqlEndpointDataSource – queries a remote SPARQL endpoint
(e.g. Apache Fuseki) via SPARQLWrapper.
Both convert RDF triples into flat dictionaries grouped by subject URI, one
dict per rdf:Class instance. Each document has _uri (full subject URI)
and _key (URI local name — the fragment or last path segment). All other
identity decisions (e.g. using a domain-specific literal as the storage key)
belong in the schema layer, not here.
Uses rdflib and SPARQLWrapper, which are core dependencies of
graflo (see pyproject.toml).
RdfDataSource
Bases: AbstractDataSource, ABC
Abstract base for RDF data sources (file and endpoint).
Captures the fields and batch-yielding logic shared by both
:class:RdfFileDataSource and :class:SparqlEndpointDataSource.
Attributes:
| Name |
Type |
Description |
rdf_class |
str | None
|
Optional URI of the rdf:Class to filter subjects by.
|
Source code in graflo/data_source/rdf.py
| class RdfDataSource(AbstractDataSource, abc.ABC):
"""Abstract base for RDF data sources (file and endpoint).
Captures the fields and batch-yielding logic shared by both
:class:`RdfFileDataSource` and :class:`SparqlEndpointDataSource`.
Attributes:
rdf_class: Optional URI of the ``rdf:Class`` to filter subjects by.
"""
source_type: DataSourceType = DataSourceType.SPARQL
rdf_class: str | None = Field(
default=None, description="URI of the rdf:Class to filter by"
)
@staticmethod
def _yield_batches(
docs: list[dict], batch_size: int, limit: int | None
) -> Iterator[list[dict]]:
"""Apply *limit*, then yield *docs* in chunks of *batch_size*."""
if limit is not None:
docs = docs[:limit]
for i in range(0, max(len(docs), 1), batch_size):
batch = docs[i : i + batch_size]
if batch:
yield batch
|
RdfFileDataSource
Bases: RdfDataSource
Data source for local RDF files.
Parses RDF files using rdflib and yields flat dictionaries grouped by
subject URI. Optionally filters by rdf_class so that only instances
of a specific class are returned.
Attributes:
| Name |
Type |
Description |
path |
Path
|
|
rdf_format |
str | None
|
Explicit rdflib format string (e.g. "turtle").
When None the format is guessed from the file extension.
|
Source code in graflo/data_source/rdf.py
| class RdfFileDataSource(RdfDataSource):
"""Data source for local RDF files.
Parses RDF files using *rdflib* and yields flat dictionaries grouped by
subject URI. Optionally filters by ``rdf_class`` so that only instances
of a specific class are returned.
Attributes:
path: Path to the RDF file.
rdf_format: Explicit rdflib format string (e.g. ``"turtle"``).
When ``None`` the format is guessed from the file extension.
"""
path: Path
rdf_format: str | None = Field(
default=None, description="rdflib serialization format"
)
def _resolve_format(self) -> str:
"""Return the rdflib format string, guessing from extension if needed."""
if self.rdf_format:
return self.rdf_format
ext = self.path.suffix.lower()
fmt = _EXT_FORMAT.get(ext)
if fmt is None:
raise ValueError(
f"Cannot determine RDF format for extension '{ext}'. "
f"Set rdf_format explicitly. Known: {list(_EXT_FORMAT.keys())}"
)
return fmt
def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Parse the RDF file and yield batches of flat dictionaries."""
try:
from rdflib import Graph
except ImportError as exc:
raise ImportError(
"rdflib is required for RDF data sources. "
"It is a core dependency of graflo; reinstall with "
"`pip install --force-reinstall graflo` or install rdflib manually."
) from exc
g = Graph()
g.parse(str(self.path), format=self._resolve_format())
logger.info(
"Parsed %d triples from %s (format=%s)",
len(g),
self.path,
self._resolve_format(),
)
docs = _triples_to_docs(g, rdf_class=self.rdf_class)
yield from self._yield_batches(docs, batch_size, limit)
|
iter_batches(batch_size=1000, limit=None)
Parse the RDF file and yield batches of flat dictionaries.
Source code in graflo/data_source/rdf.py
| def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Parse the RDF file and yield batches of flat dictionaries."""
try:
from rdflib import Graph
except ImportError as exc:
raise ImportError(
"rdflib is required for RDF data sources. "
"It is a core dependency of graflo; reinstall with "
"`pip install --force-reinstall graflo` or install rdflib manually."
) from exc
g = Graph()
g.parse(str(self.path), format=self._resolve_format())
logger.info(
"Parsed %d triples from %s (format=%s)",
len(g),
self.path,
self._resolve_format(),
)
docs = _triples_to_docs(g, rdf_class=self.rdf_class)
yield from self._yield_batches(docs, batch_size, limit)
|
SparqlEndpointDataSource
Bases: RdfDataSource
Data source that reads from a SPARQL endpoint.
Uses SPARQLWrapper to query an endpoint and returns flat dictionaries
grouped by subject.
Attributes:
Source code in graflo/data_source/rdf.py
| class SparqlEndpointDataSource(RdfDataSource):
"""Data source that reads from a SPARQL endpoint.
Uses ``SPARQLWrapper`` to query an endpoint and returns flat dictionaries
grouped by subject.
Attributes:
config: SPARQL source configuration.
"""
config: SparqlSourceConfig
def _create_wrapper(self) -> Any:
"""Create a configured ``SPARQLWrapper`` instance."""
try:
from SPARQLWrapper import JSON, SPARQLWrapper
except ImportError as exc:
raise ImportError(
"SPARQLWrapper is required for SPARQL endpoint data sources. "
"It is a core dependency of graflo; reinstall with "
"`pip install --force-reinstall graflo` or install SPARQLWrapper manually."
) from exc
sparql = SPARQLWrapper(self.config.endpoint_url)
sparql.setReturnFormat(JSON)
if self.config.username and self.config.password:
sparql.setCredentials(self.config.username, self.config.password)
return sparql
def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Query the SPARQL endpoint and yield batches of flat dictionaries.
Paginates with SPARQL LIMIT/OFFSET on **bindings** (triple rows), merges
rows into subject documents in a streaming fashion, and stops fetching
once *limit* subjects have been yielded (when set).
"""
wrapper = self._create_wrapper()
offset = 0
page_size = self.config.page_size
open_uri: str | None = None
open_doc: dict[str, Any] | None = None
batch: list[dict] = []
total_emitted = 0
def subject_completed(doc: dict[str, Any]) -> Iterator[list[dict]]:
"""Append a finished subject and yield when a batch is full."""
nonlocal batch, total_emitted
if limit is not None and total_emitted >= limit:
return
batch.append(doc)
total_emitted += 1
if len(batch) >= batch_size:
to_send = batch
batch = []
yield to_send
while True:
if limit is not None and total_emitted >= limit:
break
query = self.config.build_query(offset=offset, limit=page_size)
wrapper.setQuery(query)
logger.debug("SPARQL query (offset=%d): %s", offset, query)
results = wrapper.queryAndConvert()
bindings = results.get("results", {}).get("bindings", [])
if not bindings:
break
stop_fetching = False
for binding in bindings:
s_val = binding["s"]["value"]
if open_uri is None:
open_uri = s_val
open_doc = {"_uri": s_val, "_key": _local_name(s_val)}
elif s_val != open_uri:
assert open_doc is not None
yield from subject_completed(open_doc)
open_uri = None
open_doc = None
if limit is not None and total_emitted >= limit:
stop_fetching = True
break
open_uri = s_val
open_doc = {"_uri": s_val, "_key": _local_name(s_val)}
assert open_doc is not None
_merge_sparql_binding_into_doc(open_doc, binding)
if stop_fetching:
break
if len(bindings) < page_size:
break
offset += page_size
if (
open_doc is not None
and open_uri is not None
and (limit is None or total_emitted < limit)
):
batch.append(open_doc)
if batch:
yield batch
|
iter_batches(batch_size=1000, limit=None)
Query the SPARQL endpoint and yield batches of flat dictionaries.
Paginates with SPARQL LIMIT/OFFSET on bindings (triple rows), merges
rows into subject documents in a streaming fashion, and stops fetching
once limit subjects have been yielded (when set).
Source code in graflo/data_source/rdf.py
| def iter_batches(
self, batch_size: int = 1000, limit: int | None = None
) -> Iterator[list[dict]]:
"""Query the SPARQL endpoint and yield batches of flat dictionaries.
Paginates with SPARQL LIMIT/OFFSET on **bindings** (triple rows), merges
rows into subject documents in a streaming fashion, and stops fetching
once *limit* subjects have been yielded (when set).
"""
wrapper = self._create_wrapper()
offset = 0
page_size = self.config.page_size
open_uri: str | None = None
open_doc: dict[str, Any] | None = None
batch: list[dict] = []
total_emitted = 0
def subject_completed(doc: dict[str, Any]) -> Iterator[list[dict]]:
"""Append a finished subject and yield when a batch is full."""
nonlocal batch, total_emitted
if limit is not None and total_emitted >= limit:
return
batch.append(doc)
total_emitted += 1
if len(batch) >= batch_size:
to_send = batch
batch = []
yield to_send
while True:
if limit is not None and total_emitted >= limit:
break
query = self.config.build_query(offset=offset, limit=page_size)
wrapper.setQuery(query)
logger.debug("SPARQL query (offset=%d): %s", offset, query)
results = wrapper.queryAndConvert()
bindings = results.get("results", {}).get("bindings", [])
if not bindings:
break
stop_fetching = False
for binding in bindings:
s_val = binding["s"]["value"]
if open_uri is None:
open_uri = s_val
open_doc = {"_uri": s_val, "_key": _local_name(s_val)}
elif s_val != open_uri:
assert open_doc is not None
yield from subject_completed(open_doc)
open_uri = None
open_doc = None
if limit is not None and total_emitted >= limit:
stop_fetching = True
break
open_uri = s_val
open_doc = {"_uri": s_val, "_key": _local_name(s_val)}
assert open_doc is not None
_merge_sparql_binding_into_doc(open_doc, binding)
if stop_fetching:
break
if len(bindings) < page_size:
break
offset += page_size
if (
open_doc is not None
and open_uri is not None
and (limit is None or total_emitted < limit)
):
batch.append(open_doc)
if batch:
yield batch
|
SparqlSourceConfig
Bases: ConfigBaseModel
Configuration for a SPARQL endpoint data source.
Attributes:
| Name |
Type |
Description |
endpoint_url |
str
|
Full SPARQL query endpoint URL
(e.g. http://localhost:3030/dataset/sparql)
|
rdf_class |
str | None
|
URI of the rdf:Class whose instances to fetch
|
graph_uri |
str | None
|
Named graph to restrict the query to (optional)
|
sparql_query |
str | None
|
Custom SPARQL query override (optional)
|
username |
str | None
|
HTTP basic-auth username (optional)
|
password |
str | None
|
HTTP basic-auth password (optional)
|
page_size |
int
|
Number of results per SPARQL LIMIT/OFFSET page
|
Source code in graflo/data_source/rdf.py
| class SparqlSourceConfig(ConfigBaseModel):
"""Configuration for a SPARQL endpoint data source.
Attributes:
endpoint_url: Full SPARQL query endpoint URL
(e.g. ``http://localhost:3030/dataset/sparql``)
rdf_class: URI of the rdf:Class whose instances to fetch
graph_uri: Named graph to restrict the query to (optional)
sparql_query: Custom SPARQL query override (optional)
username: HTTP basic-auth username (optional)
password: HTTP basic-auth password (optional)
page_size: Number of results per SPARQL LIMIT/OFFSET page
"""
endpoint_url: str
rdf_class: str | None = None
graph_uri: str | None = None
sparql_query: str | None = None
username: str | None = None
password: str | None = None
page_size: int = Field(default=10_000, description="SPARQL pagination page size")
def build_query(self, offset: int = 0, limit: int | None = None) -> str:
"""Build a SPARQL SELECT query.
If *sparql_query* is set it is returned with LIMIT/OFFSET appended.
Otherwise generates::
SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
"""
if self.sparql_query:
base = self.sparql_query.rstrip().rstrip(";")
else:
graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
graph_close = "}" if self.graph_uri else ""
class_filter = f"?s a <{self.rdf_class}> . " if self.rdf_class else ""
base = (
f"SELECT ?s ?p ?o WHERE {{ "
f"{graph_open} "
f"{class_filter}"
f"?s ?p ?o . "
f"{graph_close} "
f"}}"
)
effective_limit = limit if limit is not None else self.page_size
# Group bindings by subject during streaming pagination; requires all
# triple rows for one ?s to appear contiguously in the result.
order_clause = "" if "ORDER BY" in base.upper() else " ORDER BY ?s"
return f"{base}{order_clause} LIMIT {effective_limit} OFFSET {offset}"
|
build_query(offset=0, limit=None)
Build a SPARQL SELECT query.
If sparql_query is set it is returned with LIMIT/OFFSET appended.
Otherwise generates::
SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
Source code in graflo/data_source/rdf.py
| def build_query(self, offset: int = 0, limit: int | None = None) -> str:
"""Build a SPARQL SELECT query.
If *sparql_query* is set it is returned with LIMIT/OFFSET appended.
Otherwise generates::
SELECT ?s ?p ?o WHERE { ?s a <rdf_class> . ?s ?p ?o . }
"""
if self.sparql_query:
base = self.sparql_query.rstrip().rstrip(";")
else:
graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else ""
graph_close = "}" if self.graph_uri else ""
class_filter = f"?s a <{self.rdf_class}> . " if self.rdf_class else ""
base = (
f"SELECT ?s ?p ?o WHERE {{ "
f"{graph_open} "
f"{class_filter}"
f"?s ?p ?o . "
f"{graph_close} "
f"}}"
)
effective_limit = limit if limit is not None else self.page_size
# Group bindings by subject during streaming pagination; requires all
# triple rows for one ?s to appear contiguously in the result.
order_clause = "" if "ORDER BY" in base.upper() else " ORDER BY ?s"
return f"{base}{order_clause} LIMIT {effective_limit} OFFSET {offset}"
|