Skip to content

ontocast.tool.aggregate

Refactored Graph aggregation tools for OntoCast.

This module provides functionality for aggregating and disambiguating RDF graphs from multiple chunks, handling entity and predicate disambiguation, and ensuring consistent namespace usage across the aggregated graph.

ChunkRDFGraphAggregator

Main class for aggregating and disambiguating chunk graphs.

This class provides functionality for combining RDF graphs from multiple chunks while handling entity and predicate disambiguation. It ensures consistent namespace usage and creates canonical URIs for similar entities and predicates.

Attributes:

Name Type Description
disambiguator

Entity disambiguator instance for handling entity similarity.

include_provenance

Whether to include detailed provenance triples.

Source code in ontocast/tool/aggregate.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
class ChunkRDFGraphAggregator:
    """Main class for aggregating and disambiguating chunk graphs.

    This class provides functionality for combining RDF graphs from multiple chunks
    while handling entity and predicate disambiguation. It ensures consistent
    namespace usage and creates canonical URIs for similar entities and predicates.

    Attributes:
        disambiguator: Entity disambiguator instance for handling entity similarity.
        include_provenance: Whether to include detailed provenance triples.
    """

    def __init__(
        self,
        similarity_threshold: float = 85.0,
        semantic_threshold: float = 90.0,
        include_provenance: bool = False,
    ):
        """Initialize the chunk RDF graph aggregator.

        Args:
            similarity_threshold: Threshold for considering entities similar (default: 85.0).
            semantic_threshold: Higher threshold for semantic similarity (default: 90.0).
            include_provenance: Whether to include detailed provenance triples (default: False).
        """
        self.disambiguator = EntityDisambiguator(
            similarity_threshold, semantic_threshold
        )
        self.include_provenance = include_provenance

    def aggregate_graphs(self, chunks: list[Chunk], doc_namespace: str) -> RDFGraph:
        """Aggregate multiple chunk graphs with entity and predicate disambiguation.

        This method combines multiple chunk graphs into a single graph while
        handling entity and predicate disambiguation. It creates canonical URIs
        for similar entities and predicates, and ensures consistent namespace usage.

        Args:
            chunks: List of chunks to aggregate.
            doc_namespace: The document IRI to use as base for canonical URIs.

        Returns:
            RDFGraph: Aggregated graph with disambiguated entities and predicates.
        """
        logger.info(f"Aggregating {len(chunks)} chunks for document {doc_namespace}")

        if not chunks:
            return RDFGraph()

        # Initialize aggregated graph
        aggregated_graph = RDFGraph()
        doc_namespace = self._normalize_namespace(doc_namespace)

        # Setup namespaces and collect chunk info
        namespace_info = self._collect_namespace_info(
            chunks, doc_namespace, aggregated_graph
        )

        # Collect and disambiguate entities and predicates
        entity_mapping, predicate_mapping, metadata = self._create_mappings(
            chunks, doc_namespace, namespace_info
        )

        # Add metadata for canonical entities and predicates
        self._add_canonical_metadata(
            aggregated_graph, entity_mapping, predicate_mapping, metadata
        )

        # Process triples from all chunks
        # Type assertion: chunk_namespaces is always a set
        chunk_namespaces = namespace_info["chunk_namespaces"]
        assert isinstance(chunk_namespaces, set), "chunk_namespaces should be a set"

        self._process_chunk_triples(
            chunks,
            aggregated_graph,
            entity_mapping,
            predicate_mapping,
            doc_namespace,
            chunk_namespaces,
        )

        logger.info(
            f"Aggregated {len(chunks)} chunks into graph with {len(aggregated_graph)} triples, "
            f"{len(entity_mapping)} entity mappings, {len(predicate_mapping)} predicate mappings"
        )
        return aggregated_graph

    def _normalize_namespace(self, doc_namespace: str) -> str:
        """Ensure doc_namespace ends with appropriate separator."""
        return (
            doc_namespace if doc_namespace.endswith(("/", "#")) else doc_namespace + "/"
        )

    def _collect_namespace_info(
        self, chunks: list[Chunk], doc_namespace: str, aggregated_graph: RDFGraph
    ) -> dict[str, Union[dict[str, str], set[str]]]:
        """Collect and bind all namespaces from chunks."""
        all_namespaces = {}
        chunk_namespaces = set()
        preferred_namespaces = set()

        for chunk in chunks:
            if chunk.graph is None:
                continue

            chunk_namespaces.add(chunk.namespace)

            for prefix, uri in chunk.graph.namespaces():
                if prefix not in all_namespaces:
                    all_namespaces[prefix] = uri
                elif all_namespaces[prefix] != uri:
                    # Handle prefix conflicts
                    new_prefix = f"{prefix}_{len(all_namespaces)}"
                    all_namespaces[new_prefix] = uri

        # Identify preferred (ontology) namespaces
        for uri in all_namespaces.values():
            uri_str = str(uri)
            if uri_str != doc_namespace and not any(
                uri_str.startswith(ns) for ns in chunk_namespaces
            ):
                preferred_namespaces.add(uri_str)

        # Bind namespaces to aggregated graph
        for prefix, uri in all_namespaces.items():
            aggregated_graph.bind(prefix, uri)
        aggregated_graph.bind("prov", PROV)
        aggregated_graph.bind("cd", doc_namespace)

        return {
            "all_namespaces": all_namespaces,
            "chunk_namespaces": chunk_namespaces,
            "preferred_namespaces": preferred_namespaces,
        }

    def _create_mappings(
        self, chunks: list[Chunk], doc_namespace: str, namespace_info: dict
    ) -> tuple[dict[URIRef, URIRef], dict[URIRef, URIRef], dict]:
        """Create entity and predicate mappings with disambiguation."""

        # Collect all entities and predicates with metadata
        all_entities = {}
        all_predicates = {}
        entity_types = defaultdict(set)

        for chunk in chunks:
            if chunk.graph is None:
                continue

            logger.debug(
                f"Processing chunk {chunk.hid} with namespace {chunk.namespace}"
            )

            # Collect entities and types
            chunk_entities = self.disambiguator.extract_entity_labels(chunk.graph)
            all_entities.update(chunk_entities)

            # Collect type information
            for subj, pred, obj in chunk.graph:
                if (
                    pred == RDF.type
                    and isinstance(subj, URIRef)
                    and isinstance(obj, URIRef)
                ):
                    entity_types[subj].add(obj)

            # Collect predicates
            chunk_predicates = self.disambiguator.extract_predicate_info(chunk.graph)
            self._merge_predicate_info(all_predicates, chunk_predicates)

        # Create similarity groups
        entity_groups = self.disambiguator.find_similar_entities(
            all_entities, entity_types
        )
        predicate_groups = self.disambiguator.find_similar_predicates(all_predicates)

        # Create mappings
        entity_mapping = self._create_entity_mapping(
            entity_groups,
            all_entities,
            doc_namespace,
            namespace_info["preferred_namespaces"],
            namespace_info["chunk_namespaces"],
        )

        predicate_mapping = self._create_predicate_mapping(
            predicate_groups,
            all_predicates,
            doc_namespace,
            namespace_info["chunk_namespaces"],
        )

        metadata = {
            "all_entities": all_entities,
            "all_predicates": all_predicates,
            "entity_types": entity_types,
        }

        return entity_mapping, predicate_mapping, metadata

    def _create_entity_mapping(
        self,
        entity_groups: list[list[URIRef]],
        all_entities: dict[URIRef, EntityMetadata],
        doc_namespace: str,
        preferred_namespaces: set[str],
        chunk_namespaces: set[str],
    ) -> dict[URIRef, URIRef]:
        """Create mapping from original to canonical entity URIs."""
        entity_mapping = {}
        canonical_entities = set()

        # Process similar entity groups
        for group in entity_groups:
            canonical_uri = self.disambiguator.create_canonical_iri(
                group, doc_namespace, all_entities, preferred_namespaces
            )
            canonical_uri = self._ensure_unique_uri(
                canonical_uri, canonical_entities, doc_namespace
            )
            canonical_entities.add(canonical_uri)

            for entity in group:
                entity_mapping[entity] = canonical_uri

        # Process remaining individual entities from chunk namespaces
        for entity in all_entities:
            if entity not in entity_mapping:
                entity_str = str(entity)
                # Only map chunk-local entities to document namespace
                if any(entity_str.startswith(ns) for ns in chunk_namespaces):
                    local_name = self._clean_name(
                        all_entities[entity].local_name or "entity"
                    )
                    canonical_uri = URIRef(f"{doc_namespace}{local_name}")
                    canonical_uri = self._ensure_unique_uri(
                        canonical_uri, canonical_entities, doc_namespace
                    )
                    canonical_entities.add(canonical_uri)
                    entity_mapping[entity] = canonical_uri

        return entity_mapping

    def _create_predicate_mapping(
        self,
        predicate_groups: list[list[URIRef]],
        all_predicates: dict[URIRef, PredicateMetadata],
        doc_namespace: str,
        chunk_namespaces: set[str],
    ) -> dict[URIRef, URIRef]:
        """Create mapping from original to canonical predicate URIs."""
        predicate_mapping = {}
        canonical_predicates = set()

        # Process similar predicate groups
        for group in predicate_groups:
            canonical_uri = self.disambiguator.create_canonical_predicate(
                group, doc_namespace, all_predicates
            )
            canonical_uri = self._ensure_unique_uri(
                canonical_uri, canonical_predicates, doc_namespace
            )
            canonical_predicates.add(canonical_uri)

            for predicate in group:
                predicate_mapping[predicate] = canonical_uri

        # Process remaining individual predicates from chunk namespaces
        for predicate in all_predicates:
            if predicate not in predicate_mapping:
                predicate_str = str(predicate)
                if any(predicate_str.startswith(ns) for ns in chunk_namespaces):
                    local_name = self._clean_name(
                        all_predicates[predicate].local_name or "predicate"
                    )
                    canonical_uri = URIRef(f"{doc_namespace}{local_name}")
                    canonical_uri = self._ensure_unique_uri(
                        canonical_uri, canonical_predicates, doc_namespace
                    )
                    canonical_predicates.add(canonical_uri)
                    predicate_mapping[predicate] = canonical_uri

        return predicate_mapping

    def _ensure_unique_uri(
        self, uri: URIRef, existing: set[URIRef], namespace: str
    ) -> URIRef:
        """Ensure URI uniqueness by appending counter if needed."""
        base_uri = uri
        counter = 1
        while uri in existing:
            local_name = str(base_uri).split(namespace)[-1]
            uri = URIRef(f"{namespace}{local_name}_{counter}")
            counter += 1
        return uri

    def _clean_name(self, name: str) -> str:
        """Clean name for use in URIs."""
        import re

        cleaned = re.sub(r"[^\w\-.]", "_", name)
        cleaned = re.sub(r"_+", "_", cleaned).strip("_")
        return cleaned or "entity"

    def _merge_predicate_info(
        self,
        target: dict[URIRef, PredicateMetadata],
        source: dict[URIRef, PredicateMetadata],
    ) -> None:
        """Merge predicate information, preferring more complete data."""
        for pred, info in source.items():
            if pred not in target:
                target[pred] = info
            else:
                existing = target[pred]
                for attr in ["label", "comment", "domain", "range"]:
                    existing_val = getattr(existing, attr)
                    new_val = getattr(info, attr)

                    if existing_val is None and new_val is not None:
                        setattr(existing, attr, new_val)
                    elif (
                        existing_val is not None
                        and new_val is not None
                        and isinstance(new_val, str)
                        and len(new_val) > len(str(existing_val))
                    ):
                        setattr(existing, attr, new_val)

                if info.is_explicit_property:
                    existing.is_explicit_property = True

    def _process_chunk_triples(
        self,
        chunks: list[Chunk],
        aggregated_graph: RDFGraph,
        entity_mapping: dict[URIRef, URIRef],
        predicate_mapping: dict[URIRef, URIRef],
        doc_namespace: str,
        chunk_namespaces: set[str],
    ) -> None:
        """Process triples from all chunks with disambiguation."""
        for chunk in chunks:
            if chunk.graph is None:
                continue

            chunk_iri = URIRef(chunk.iri)

            # Add minimal provenance if requested
            if self.include_provenance:
                aggregated_graph.add((chunk_iri, RDF.type, PROV.Entity))
                aggregated_graph.add(
                    (chunk_iri, PROV.wasPartOf, URIRef(doc_namespace.rstrip("#/")))
                )

            # Process triples with mapping
            for subj, pred, obj in chunk.graph:
                # Skip chunk metadata triples
                if subj == chunk_iri:
                    continue

                # Apply mappings based on namespace
                new_subj = (
                    self._apply_mapping(subj, entity_mapping, chunk_namespaces)
                    if isinstance(subj, (URIRef, Literal))
                    else subj
                )
                new_pred = (
                    self._apply_mapping(pred, predicate_mapping, chunk_namespaces)
                    if isinstance(pred, (URIRef, Literal))
                    else pred
                )

                # Special handling for rdf:type objects (preserve ontology classes)
                if new_pred == RDF.type and isinstance(obj, URIRef):
                    new_obj = obj  # Keep ontology classes unchanged
                else:
                    new_obj = (
                        self._apply_mapping(obj, entity_mapping, chunk_namespaces)
                        if isinstance(obj, (URIRef, Literal))
                        else obj
                    )

                aggregated_graph.add((new_subj, new_pred, new_obj))

                # Add selective provenance
                if (
                    self.include_provenance
                    and isinstance(new_subj, URIRef)
                    and str(new_subj).startswith(doc_namespace)
                ):
                    aggregated_graph.add((new_subj, PROV.wasGeneratedBy, chunk_iri))

    def _apply_mapping(
        self,
        uri: Union[URIRef, Literal],
        mapping: dict[URIRef, URIRef],
        chunk_namespaces: set[str],
    ) -> Union[URIRef, Literal]:
        """Apply mapping only if URI is from chunk namespace."""
        if not isinstance(uri, URIRef):
            return uri

        uri_str = str(uri)
        if any(uri_str.startswith(ns) for ns in chunk_namespaces):
            return mapping.get(uri, uri)
        return uri

    def _add_canonical_metadata(
        self,
        graph: RDFGraph,
        entity_mapping: dict[URIRef, URIRef],
        predicate_mapping: dict[URIRef, URIRef],
        metadata: dict,
    ) -> None:
        """Add metadata for canonical entities and predicates."""
        all_entities = metadata["all_entities"]
        all_predicates = metadata["all_predicates"]
        entity_types = metadata["entity_types"]

        # Group entities by their canonical form
        canonical_to_originals = defaultdict(list)
        for original, canonical in entity_mapping.items():
            canonical_to_originals[canonical].append(original)

        # Add metadata for grouped entities
        for canonical, originals in canonical_to_originals.items():
            self._add_entity_metadata(
                graph, canonical, originals, all_entities, entity_types
            )

        # Add metadata for individual entities
        processed_entities = set(entity_mapping.keys())
        for entity in all_entities:
            if entity not in processed_entities:
                self._add_individual_entity_metadata(
                    graph, entity, all_entities, entity_types
                )

        # Similar process for predicates
        canonical_pred_to_originals = defaultdict(list)
        doc_namespace = graph.namespace_manager.store.namespace("cd")

        for original, canonical in predicate_mapping.items():
            if doc_namespace is not None and str(canonical).startswith(
                str(doc_namespace)
            ):
                canonical_pred_to_originals[canonical].append(original)

        for canonical, originals in canonical_pred_to_originals.items():
            merged_info = self._get_merged_predicate_info(originals, all_predicates)
            self._add_predicate_metadata(graph, canonical, merged_info)

        # Add metadata for individual predicates
        processed_predicates = set(predicate_mapping.keys())
        for predicate, info in all_predicates.items():
            if (
                predicate not in processed_predicates
                and doc_namespace is not None
                and str(predicate).startswith(str(doc_namespace))
            ):
                self._add_predicate_metadata(graph, predicate, info)

    def _add_entity_metadata(
        self,
        graph: RDFGraph,
        canonical: URIRef,
        originals: list[URIRef],
        all_entities: dict[URIRef, EntityMetadata],
        entity_types: dict[URIRef, set[URIRef]],
    ) -> None:
        """Add metadata for a canonical entity."""
        # Best label from the group
        best_label = self._get_best_label(
            [all_entities.get(orig) for orig in originals]
        )
        if best_label:
            graph.add((canonical, RDFS.label, Literal(best_label)))

        # Collect all types
        all_types = set()
        for orig in originals:
            all_types.update(entity_types.get(orig, set()))

        for type_uri in all_types:
            graph.add((canonical, RDF.type, type_uri))

        # Link to ontology instances
        doc_namespace = graph.namespace_manager.store.namespace("cd")
        for orig in originals:
            if doc_namespace is not None and not str(orig).startswith(
                str(doc_namespace)
            ):
                graph.add((canonical, OWL.sameAs, orig))

    def _add_individual_entity_metadata(
        self,
        graph: RDFGraph,
        entity: URIRef,
        all_entities: dict[URIRef, EntityMetadata],
        entity_types: dict[URIRef, set[URIRef]],
    ) -> None:
        """Add metadata for an individual entity."""
        if entity in all_entities and all_entities[entity].label:
            graph.add((entity, RDFS.label, Literal(all_entities[entity].label)))

        for type_uri in entity_types.get(entity, set()):
            graph.add((entity, RDF.type, type_uri))

    def _add_predicate_metadata(
        self, graph: RDFGraph, predicate: URIRef, info: PredicateMetadata
    ) -> None:
        """Add metadata for a predicate."""
        if info.label:
            graph.add((predicate, RDFS.label, Literal(info.label)))
        if info.comment:
            graph.add((predicate, RDFS.comment, Literal(info.comment)))
        if info.domain:
            graph.add((predicate, RDFS.domain, info.domain))
        if info.range:
            graph.add((predicate, RDFS.range, info.range))
        if info.is_explicit_property:
            graph.add((predicate, RDF.type, RDF.Property))

    def _get_best_label(self, metadata_list: list[EntityMetadata | None]) -> str | None:
        """Get the best label from a list of entity metadata."""
        labels = [m.label for m in metadata_list if m and m.label]
        return max(labels, key=len) if labels else None

    def _get_merged_predicate_info(
        self, originals: list[URIRef], all_predicates: dict[URIRef, PredicateMetadata]
    ) -> PredicateMetadata:
        """Merge predicate information from multiple sources."""
        merged = PredicateMetadata(local_name="", is_explicit_property=False)

        for pred in originals:
            info = all_predicates.get(pred)
            if not info:
                continue

            for attr in ["label", "comment", "domain", "range"]:
                current = getattr(merged, attr)
                new_val = getattr(info, attr)

                if current is None and new_val is not None:
                    setattr(merged, attr, new_val)
                elif (
                    current is not None
                    and new_val is not None
                    and isinstance(new_val, str)
                    and len(new_val) > len(str(current))
                ):
                    setattr(merged, attr, new_val)

            if info.is_explicit_property:
                merged.is_explicit_property = True

        return merged

__init__(similarity_threshold=85.0, semantic_threshold=90.0, include_provenance=False)

Initialize the chunk RDF graph aggregator.

Parameters:

Name Type Description Default
similarity_threshold float

Threshold for considering entities similar (default: 85.0).

85.0
semantic_threshold float

Higher threshold for semantic similarity (default: 90.0).

90.0
include_provenance bool

Whether to include detailed provenance triples (default: False).

False
Source code in ontocast/tool/aggregate.py
def __init__(
    self,
    similarity_threshold: float = 85.0,
    semantic_threshold: float = 90.0,
    include_provenance: bool = False,
):
    """Initialize the chunk RDF graph aggregator.

    Args:
        similarity_threshold: Threshold for considering entities similar (default: 85.0).
        semantic_threshold: Higher threshold for semantic similarity (default: 90.0).
        include_provenance: Whether to include detailed provenance triples (default: False).
    """
    self.disambiguator = EntityDisambiguator(
        similarity_threshold, semantic_threshold
    )
    self.include_provenance = include_provenance

aggregate_graphs(chunks, doc_namespace)

Aggregate multiple chunk graphs with entity and predicate disambiguation.

This method combines multiple chunk graphs into a single graph while handling entity and predicate disambiguation. It creates canonical URIs for similar entities and predicates, and ensures consistent namespace usage.

Parameters:

Name Type Description Default
chunks list[Chunk]

List of chunks to aggregate.

required
doc_namespace str

The document IRI to use as base for canonical URIs.

required

Returns:

Name Type Description
RDFGraph RDFGraph

Aggregated graph with disambiguated entities and predicates.

Source code in ontocast/tool/aggregate.py
def aggregate_graphs(self, chunks: list[Chunk], doc_namespace: str) -> RDFGraph:
    """Aggregate multiple chunk graphs with entity and predicate disambiguation.

    This method combines multiple chunk graphs into a single graph while
    handling entity and predicate disambiguation. It creates canonical URIs
    for similar entities and predicates, and ensures consistent namespace usage.

    Args:
        chunks: List of chunks to aggregate.
        doc_namespace: The document IRI to use as base for canonical URIs.

    Returns:
        RDFGraph: Aggregated graph with disambiguated entities and predicates.
    """
    logger.info(f"Aggregating {len(chunks)} chunks for document {doc_namespace}")

    if not chunks:
        return RDFGraph()

    # Initialize aggregated graph
    aggregated_graph = RDFGraph()
    doc_namespace = self._normalize_namespace(doc_namespace)

    # Setup namespaces and collect chunk info
    namespace_info = self._collect_namespace_info(
        chunks, doc_namespace, aggregated_graph
    )

    # Collect and disambiguate entities and predicates
    entity_mapping, predicate_mapping, metadata = self._create_mappings(
        chunks, doc_namespace, namespace_info
    )

    # Add metadata for canonical entities and predicates
    self._add_canonical_metadata(
        aggregated_graph, entity_mapping, predicate_mapping, metadata
    )

    # Process triples from all chunks
    # Type assertion: chunk_namespaces is always a set
    chunk_namespaces = namespace_info["chunk_namespaces"]
    assert isinstance(chunk_namespaces, set), "chunk_namespaces should be a set"

    self._process_chunk_triples(
        chunks,
        aggregated_graph,
        entity_mapping,
        predicate_mapping,
        doc_namespace,
        chunk_namespaces,
    )

    logger.info(
        f"Aggregated {len(chunks)} chunks into graph with {len(aggregated_graph)} triples, "
        f"{len(entity_mapping)} entity mappings, {len(predicate_mapping)} predicate mappings"
    )
    return aggregated_graph