Skip to content

graflo.hq.graph_engine

Graph engine for orchestrating schema inference, connector creation, and ingestion.

This module provides the GraphEngine class which serves as the main orchestrator for graph database operations, coordinating between inference, connector mapping, and data ingestion.

GraphEngine

Orchestrator for graph database operations.

GraphEngine coordinates schema inference, connector creation, schema definition, and data ingestion, providing a unified interface for working with graph databases.

The typical workflow is: 1. infer_schema() - Infer schema from source database (if possible) 2. create_bindings() - Create bindings mapping resources to data sources (if possible) 3. define_schema() - Define schema in target database (if possible and necessary) 4. ingest() - Ingest data into the target database

Attributes:

Name Type Description
target_db_flavor

Target database flavor for schema sanitization

resource_mapper

ResourceMapper instance for connector creation

Source code in graflo/hq/graph_engine.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
class GraphEngine:
    """Orchestrator for graph database operations.

    GraphEngine coordinates schema inference, connector creation, schema definition,
    and data ingestion, providing a unified interface for working with graph databases.

    The typical workflow is:
    1. infer_schema() - Infer schema from source database (if possible)
    2. create_bindings() - Create bindings mapping resources to data sources (if possible)
    3. define_schema() - Define schema in target database (if possible and necessary)
    4. ingest() - Ingest data into the target database

    Attributes:
        target_db_flavor: Target database flavor for schema sanitization
        resource_mapper: ResourceMapper instance for connector creation
    """

    def __init__(
        self,
        target_db_flavor: DBType = DBType.ARANGO,
    ):
        """Initialize the GraphEngine.

        Args:
            target_db_flavor: Target database flavor for schema sanitization
        """
        self.target_db_flavor = target_db_flavor
        self.resource_mapper = ResourceMapper()
        self.connection_provider: ConnectionProvider = EmptyConnectionProvider()

    def introspect(
        self,
        postgres_config: PostgresConfig,
        schema_name: str | None = None,
        include_raw_tables: bool = True,
    ) -> SchemaIntrospectionResult:
        """Introspect PostgreSQL schema and return a serializable result.

        Args:
            postgres_config: PostgresConfig instance
            schema_name: Schema name to introspect (defaults to config schema_name or 'public')

        Returns:
            SchemaIntrospectionResult: Introspection result (vertex_tables, edge_tables,
                raw_tables, schema_name) suitable for serialization.
        """
        with PostgresConnection(postgres_config) as postgres_conn:
            inferencer = SQLInferenceManager(
                conn=postgres_conn,
                target_db_flavor=self.target_db_flavor,
            )
            return inferencer.introspect(
                schema_name=schema_name,
                include_raw_tables=include_raw_tables,
            )

    def infer_manifest(
        self,
        postgres_config: PostgresConfig,
        schema_name: str | None = None,
        fuzzy_threshold: float = 0.8,
        discard_disconnected_vertices: bool = False,
    ) -> GraphManifest:
        """Infer a GraphManifest from PostgreSQL database.

        Args:
            postgres_config: PostgresConfig instance
            schema_name: Schema name to introspect (defaults to config schema_name or 'public')
            fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8)
            discard_disconnected_vertices: If True, remove vertices that do not take part in
                any relation (and resources/actors that reference them). Default False.

        Returns:
            GraphManifest: Inferred manifest with schema, ingestion model, and bindings.
        """
        with PostgresConnection(postgres_config) as postgres_conn:
            inferencer = SQLInferenceManager(
                conn=postgres_conn,
                target_db_flavor=self.target_db_flavor,
                fuzzy_threshold=fuzzy_threshold,
            )
            artifacts = inferencer.infer_artifacts(schema_name=schema_name)
            schema, ingestion_model = artifacts.schema, artifacts.ingestion_model
            bindings, provider = (
                self.resource_mapper.create_bindings_with_provider_from_introspection(
                    introspection_result=artifacts.introspection_result,
                    conn=postgres_conn,
                    schema_name=schema_name,
                )
            )
            self.connection_provider = provider
        if discard_disconnected_vertices:
            disconnected = schema.remove_disconnected_vertices()
            ingestion_model.prune_to_graph(
                schema.core_schema, disconnected=disconnected
            )
            connected_resources = {
                resource.name for resource in ingestion_model.resources
            }
            connectors = list(bindings.connectors)
            resource_connector = list(bindings.resource_connector)
            connector_connection = list(bindings.connector_connection)

            connector_refs_all = set()
            for connector in connectors:
                connector_refs_all.add(connector.hash)
                if connector.name:
                    connector_refs_all.add(connector.name)
            filtered_resource_connector = []
            mapped_connector_refs = set()
            for mapping in resource_connector:
                if isinstance(mapping, dict):
                    resource_name = mapping.get("resource")
                    connector_ref = mapping.get("connector")
                else:
                    resource_name = mapping.resource
                    connector_ref = mapping.connector
                if (
                    resource_name in connected_resources
                    and isinstance(connector_ref, str)
                    and connector_ref in connector_refs_all
                ):
                    filtered_resource_connector.append(mapping)
                    mapped_connector_refs.add(connector_ref)
            filtered_connectors = [
                connector
                for connector in connectors
                if connector.resource_name in connected_resources
                or connector.hash in mapped_connector_refs
                or (
                    connector.name is not None
                    and connector.name in mapped_connector_refs
                )
            ]
            valid_connector_refs = set()
            for connector in filtered_connectors:
                valid_connector_refs.add(connector.hash)
                if connector.name:
                    valid_connector_refs.add(connector.name)
            filtered_connector_connection = []
            for mapping in connector_connection:
                if isinstance(mapping, dict):
                    connector_ref = mapping.get("connector")
                else:
                    connector_ref = mapping.connector
                if connector_ref in valid_connector_refs:
                    filtered_connector_connection.append(mapping)
            bindings_dict = bindings.to_dict(skip_defaults=False)
            bindings_dict["connectors"] = filtered_connectors
            bindings_dict["resource_connector"] = filtered_resource_connector
            bindings_dict["connector_connection"] = filtered_connector_connection
            bindings = Bindings.from_dict(bindings_dict)
        manifest = GraphManifest(
            graph_schema=schema, ingestion_model=ingestion_model, bindings=bindings
        )
        # Apply DB-flavor-specific sanitization a posteriori (reserved words,
        # TigerGraph identity normalization, etc.). Sanitizer is the single
        # entry point that maps `target_db_flavor` to the corresponding
        # evolution ops.
        Sanitizer(self.target_db_flavor).sanitize_manifest(manifest)
        return manifest

    def create_bindings(
        self,
        postgres_config: PostgresConfig,
        schema_name: str | None = None,
        datetime_columns: dict[str, str] | None = None,
        type_lookup_overrides: dict[str, dict] | None = None,
        include_raw_tables: bool = False,
    ) -> Bindings:
        """Create Bindings from PostgreSQL tables.

        Args:
            postgres_config: PostgresConfig instance
            schema_name: Schema name to introspect
            datetime_columns: Optional mapping of resource/table name to datetime
                column name for date-range filtering (sets date_field per
                TableConnector). Use with IngestionParams.datetime_after /
                datetime_before.
            type_lookup_overrides: Optional mapping of table name to type_lookup
                spec for edge tables where source/target types come from a
                lookup table. Each value: {table, identity, type_column,
                source, target, relation?}.

        Returns:
            Bindings: Bindings object with TableConnector instances for all tables
        """
        with PostgresConnection(postgres_config) as postgres_conn:
            bindings, provider = (
                self.resource_mapper.create_bindings_with_provider_from_postgres(
                    conn=postgres_conn,
                    schema_name=schema_name,
                    datetime_columns=datetime_columns,
                    type_lookup_overrides=type_lookup_overrides,
                    include_raw_tables=include_raw_tables,
                )
            )
        self.connection_provider = provider
        return bindings

    def define_schema(
        self,
        manifest: GraphManifest,
        target_db_config: DBConfig,
        recreate_schema: bool = False,
        graph_target_namespace: str | None = None,
    ) -> None:
        """Define schema in the target database.

        This method handles database/schema creation and initialization.
        Some databases don't require explicit schema definition (e.g., Neo4j),
        but this method ensures the database is properly initialized.

        If the schema/graph already exists and recreate_schema is False (default),
        init_db raises SchemaExistsError and the script halts.

        Args:
            manifest: GraphManifest with schema block.
            target_db_config: Target database connection configuration
            recreate_schema: If True, drop existing schema and define new one.
                If False and schema/graph already exists, raises SchemaExistsError.
            graph_target_namespace: Optional target graph/database/space name (e.g. temp
                schema). Overrides ``schema.db_profile.target_namespace`` and defaults
                ahead of ``schema.metadata.name`` when the config omits the namespace.
        """
        schema = manifest.require_schema()

        _ensure_graph_target_namespace(schema, target_db_config, graph_target_namespace)

        # Ensure schema reflects target DB so finish_init applies DB-specific defaults.
        schema.db_profile.db_flavor = target_db_config.connection_type
        schema.finish_init()

        # Initialize database with schema definition
        # init_db() handles database/schema creation automatically
        # It checks if the database exists and creates it if needed
        with ConnectionManager(connection_config=target_db_config) as db_client:
            db_client.init_db(schema, recreate_schema)

    def define_and_ingest(
        self,
        manifest: GraphManifest,
        target_db_config: DBConfig,
        ingestion_params: IngestionParams | None = None,
        connection_provider: ConnectionProvider | None = None,
        recreate_schema: bool | None = None,
        clear_data: bool | None = None,
        graph_target_namespace: str | None = None,
    ) -> None:
        """Define schema and ingest data into the graph database in one operation.

        This is a convenience method that chains define_schema() and ingest().
        It's the recommended way to set up and populate a graph database.

        Args:
            manifest: GraphManifest with schema/ingestion/bindings blocks.
            target_db_config: Target database connection configuration
            ingestion_params: IngestionParams instance with ingestion configuration.
                If None, uses default IngestionParams()
            recreate_schema: If True, drop existing schema and define new one.
                If None, defaults to False. When False and schema already exists,
                define_schema raises SchemaExistsError and the script halts.
            clear_data: If True, remove existing data before ingestion (schema unchanged).
                If None, uses ingestion_params.clear_data.
            graph_target_namespace: Optional target graph/database/space name; passed
                to both ``define_schema`` and ``ingest`` for consistent resolution.
        """
        ingestion_params = ingestion_params or IngestionParams()
        if clear_data is None:
            clear_data = ingestion_params.clear_data
        if recreate_schema is None:
            recreate_schema = False

        # Define schema first (halts with SchemaExistsError if schema exists and recreate_schema is False)
        self.define_schema(
            manifest=manifest,
            target_db_config=target_db_config,
            recreate_schema=recreate_schema,
            graph_target_namespace=graph_target_namespace,
        )

        # Then ingest data (clear_data is applied inside ingest() when ingestion_params.clear_data)
        ingestion_params = ingestion_params.model_copy(
            update={"clear_data": clear_data}
        )
        self.ingest(
            manifest=manifest,
            target_db_config=target_db_config,
            ingestion_params=ingestion_params,
            connection_provider=connection_provider,
            graph_target_namespace=graph_target_namespace,
        )

    def ingest(
        self,
        manifest: GraphManifest,
        target_db_config: DBConfig,
        ingestion_params: IngestionParams | None = None,
        connection_provider: ConnectionProvider | None = None,
        graph_target_namespace: str | None = None,
    ) -> None:
        """Ingest data into the graph database.

        If ingestion_params.clear_data is True, removes all existing data
        (without touching the schema) before ingestion.

        Args:
            manifest: GraphManifest with schema/ingestion/bindings blocks.
            target_db_config: Target database connection configuration
            ingestion_params: IngestionParams instance with ingestion configuration.
                If None, uses default IngestionParams()
            graph_target_namespace: Same semantics as ``define_schema``; use when
                calling ``ingest`` without a prior ``define_schema`` on this config.
        """
        schema = manifest.require_schema()
        ingestion_model = manifest.require_ingestion_model()
        bindings = manifest.bindings

        _ensure_graph_target_namespace(schema, target_db_config, graph_target_namespace)

        ingestion_params = ingestion_params or IngestionParams()
        if ingestion_params.clear_data:
            with ConnectionManager(connection_config=target_db_config) as db_client:
                clear_result = db_client.clear_data(schema)
                if inspect.isawaitable(clear_result):
                    raise TypeError(
                        "clear_data must be synchronous so ingestion only starts "
                        "after data clearing has completed."
                    )

        caster = Caster(
            schema=schema,
            ingestion_model=ingestion_model,
            ingestion_params=ingestion_params,
        )
        caster.ingest(
            target_db_config=target_db_config,
            bindings=bindings or Bindings(),
            ingestion_params=ingestion_params,
            connection_provider=connection_provider or self.connection_provider,
        )

    # ------------------------------------------------------------------
    # RDF / SPARQL inference
    # ------------------------------------------------------------------

    def infer_schema_from_rdf(
        self,
        source: str | Path,
        *,
        endpoint_url: str | None = None,
        graph_uri: str | None = None,
        schema_name: str | None = None,
    ) -> tuple[Schema, IngestionModel]:
        """Infer a graflo Schema from an RDF / OWL ontology.

        Reads the TBox (class and property declarations) and produces
        vertices (from ``owl:Class``), fields (from ``owl:DatatypeProperty``),
        and edges (from ``owl:ObjectProperty`` with domain/range).

        Args:
            source: Path to an RDF file (e.g. ``ontology.ttl``) or a base
                URL when using *endpoint_url*.
            endpoint_url: Optional SPARQL endpoint to CONSTRUCT the
                ontology from.
            graph_uri: Named graph containing the ontology.
            schema_name: Name for the resulting schema.

        Returns:
            tuple[Schema, IngestionModel]: fully initialised schema and ingestion model.
        """
        from graflo.hq.rdf_inferencer import RdfInferenceManager

        mgr = RdfInferenceManager(target_db_flavor=self.target_db_flavor)
        return mgr.infer_schema(
            source,
            endpoint_url=endpoint_url,
            graph_uri=graph_uri,
            schema_name=schema_name,
        )

    def create_bindings_from_rdf(
        self,
        source: str | Path,
        *,
        endpoint_url: str | None = None,
        graph_uri: str | None = None,
        sparql_config: SparqlEndpointConfig | None = None,
    ) -> Bindings:
        """Create :class:`Bindings` from an RDF ontology.

        One :class:`SparqlConnector` is created per ``owl:Class`` found in the
        ontology.

        Args:
            source: Path to an RDF file or base URL.
            endpoint_url: SPARQL endpoint for the *data* (ABox).
            graph_uri: Named graph containing the data.
            sparql_config: Optional :class:`SparqlEndpointConfig` to attach
                to the resulting connectors for authentication.

        Returns:
            Bindings with SPARQL connectors for each class.
        """
        from graflo.hq.rdf_inferencer import RdfInferenceManager

        mgr = RdfInferenceManager(target_db_flavor=self.target_db_flavor)
        bindings = mgr.create_bindings(
            source,
            endpoint_url=endpoint_url,
            graph_uri=graph_uri,
        )

        if sparql_config:
            conn_proxy = "sparql_source"
            provider = InMemoryConnectionProvider()
            provider.register_generalized_config(
                conn_proxy=conn_proxy,
                config=SparqlGeneralizedConnConfig(config=sparql_config),
            )
            provider.default_sparql = sparql_config

            # Wire all SPARQL connectors to the same credential proxy.
            from graflo.architecture.contract.bindings import SparqlConnector

            for connector in bindings.connectors:
                if not isinstance(connector, SparqlConnector):
                    continue
                bindings.bind_connector_to_conn_proxy(connector, conn_proxy)
                provider.bind_connector_to_conn_proxy(
                    connector=connector, conn_proxy=conn_proxy
                )
        else:
            provider = EmptyConnectionProvider()
        self.connection_provider = provider
        return bindings

__init__(target_db_flavor=DBType.ARANGO)

Initialize the GraphEngine.

Parameters:

Name Type Description Default
target_db_flavor DBType

Target database flavor for schema sanitization

ARANGO
Source code in graflo/hq/graph_engine.py
def __init__(
    self,
    target_db_flavor: DBType = DBType.ARANGO,
):
    """Initialize the GraphEngine.

    Args:
        target_db_flavor: Target database flavor for schema sanitization
    """
    self.target_db_flavor = target_db_flavor
    self.resource_mapper = ResourceMapper()
    self.connection_provider: ConnectionProvider = EmptyConnectionProvider()

create_bindings(postgres_config, schema_name=None, datetime_columns=None, type_lookup_overrides=None, include_raw_tables=False)

Create Bindings from PostgreSQL tables.

Parameters:

Name Type Description Default
postgres_config PostgresConfig

PostgresConfig instance

required
schema_name str | None

Schema name to introspect

None
datetime_columns dict[str, str] | None

Optional mapping of resource/table name to datetime column name for date-range filtering (sets date_field per TableConnector). Use with IngestionParams.datetime_after / datetime_before.

None
type_lookup_overrides dict[str, dict] | None

Optional mapping of table name to type_lookup spec for edge tables where source/target types come from a lookup table. Each value: {table, identity, type_column, source, target, relation?}.

None

Returns:

Name Type Description
Bindings Bindings

Bindings object with TableConnector instances for all tables

Source code in graflo/hq/graph_engine.py
def create_bindings(
    self,
    postgres_config: PostgresConfig,
    schema_name: str | None = None,
    datetime_columns: dict[str, str] | None = None,
    type_lookup_overrides: dict[str, dict] | None = None,
    include_raw_tables: bool = False,
) -> Bindings:
    """Create Bindings from PostgreSQL tables.

    Args:
        postgres_config: PostgresConfig instance
        schema_name: Schema name to introspect
        datetime_columns: Optional mapping of resource/table name to datetime
            column name for date-range filtering (sets date_field per
            TableConnector). Use with IngestionParams.datetime_after /
            datetime_before.
        type_lookup_overrides: Optional mapping of table name to type_lookup
            spec for edge tables where source/target types come from a
            lookup table. Each value: {table, identity, type_column,
            source, target, relation?}.

    Returns:
        Bindings: Bindings object with TableConnector instances for all tables
    """
    with PostgresConnection(postgres_config) as postgres_conn:
        bindings, provider = (
            self.resource_mapper.create_bindings_with_provider_from_postgres(
                conn=postgres_conn,
                schema_name=schema_name,
                datetime_columns=datetime_columns,
                type_lookup_overrides=type_lookup_overrides,
                include_raw_tables=include_raw_tables,
            )
        )
    self.connection_provider = provider
    return bindings

create_bindings_from_rdf(source, *, endpoint_url=None, graph_uri=None, sparql_config=None)

Create :class:Bindings from an RDF ontology.

One :class:SparqlConnector is created per owl:Class found in the ontology.

Parameters:

Name Type Description Default
source str | Path

Path to an RDF file or base URL.

required
endpoint_url str | None

SPARQL endpoint for the data (ABox).

None
graph_uri str | None

Named graph containing the data.

None
sparql_config SparqlEndpointConfig | None

Optional :class:SparqlEndpointConfig to attach to the resulting connectors for authentication.

None

Returns:

Type Description
Bindings

Bindings with SPARQL connectors for each class.

Source code in graflo/hq/graph_engine.py
def create_bindings_from_rdf(
    self,
    source: str | Path,
    *,
    endpoint_url: str | None = None,
    graph_uri: str | None = None,
    sparql_config: SparqlEndpointConfig | None = None,
) -> Bindings:
    """Create :class:`Bindings` from an RDF ontology.

    One :class:`SparqlConnector` is created per ``owl:Class`` found in the
    ontology.

    Args:
        source: Path to an RDF file or base URL.
        endpoint_url: SPARQL endpoint for the *data* (ABox).
        graph_uri: Named graph containing the data.
        sparql_config: Optional :class:`SparqlEndpointConfig` to attach
            to the resulting connectors for authentication.

    Returns:
        Bindings with SPARQL connectors for each class.
    """
    from graflo.hq.rdf_inferencer import RdfInferenceManager

    mgr = RdfInferenceManager(target_db_flavor=self.target_db_flavor)
    bindings = mgr.create_bindings(
        source,
        endpoint_url=endpoint_url,
        graph_uri=graph_uri,
    )

    if sparql_config:
        conn_proxy = "sparql_source"
        provider = InMemoryConnectionProvider()
        provider.register_generalized_config(
            conn_proxy=conn_proxy,
            config=SparqlGeneralizedConnConfig(config=sparql_config),
        )
        provider.default_sparql = sparql_config

        # Wire all SPARQL connectors to the same credential proxy.
        from graflo.architecture.contract.bindings import SparqlConnector

        for connector in bindings.connectors:
            if not isinstance(connector, SparqlConnector):
                continue
            bindings.bind_connector_to_conn_proxy(connector, conn_proxy)
            provider.bind_connector_to_conn_proxy(
                connector=connector, conn_proxy=conn_proxy
            )
    else:
        provider = EmptyConnectionProvider()
    self.connection_provider = provider
    return bindings

define_and_ingest(manifest, target_db_config, ingestion_params=None, connection_provider=None, recreate_schema=None, clear_data=None, graph_target_namespace=None)

Define schema and ingest data into the graph database in one operation.

This is a convenience method that chains define_schema() and ingest(). It's the recommended way to set up and populate a graph database.

Parameters:

Name Type Description Default
manifest GraphManifest

GraphManifest with schema/ingestion/bindings blocks.

required
target_db_config DBConfig

Target database connection configuration

required
ingestion_params IngestionParams | None

IngestionParams instance with ingestion configuration. If None, uses default IngestionParams()

None
recreate_schema bool | None

If True, drop existing schema and define new one. If None, defaults to False. When False and schema already exists, define_schema raises SchemaExistsError and the script halts.

None
clear_data bool | None

If True, remove existing data before ingestion (schema unchanged). If None, uses ingestion_params.clear_data.

None
graph_target_namespace str | None

Optional target graph/database/space name; passed to both define_schema and ingest for consistent resolution.

None
Source code in graflo/hq/graph_engine.py
def define_and_ingest(
    self,
    manifest: GraphManifest,
    target_db_config: DBConfig,
    ingestion_params: IngestionParams | None = None,
    connection_provider: ConnectionProvider | None = None,
    recreate_schema: bool | None = None,
    clear_data: bool | None = None,
    graph_target_namespace: str | None = None,
) -> None:
    """Define schema and ingest data into the graph database in one operation.

    This is a convenience method that chains define_schema() and ingest().
    It's the recommended way to set up and populate a graph database.

    Args:
        manifest: GraphManifest with schema/ingestion/bindings blocks.
        target_db_config: Target database connection configuration
        ingestion_params: IngestionParams instance with ingestion configuration.
            If None, uses default IngestionParams()
        recreate_schema: If True, drop existing schema and define new one.
            If None, defaults to False. When False and schema already exists,
            define_schema raises SchemaExistsError and the script halts.
        clear_data: If True, remove existing data before ingestion (schema unchanged).
            If None, uses ingestion_params.clear_data.
        graph_target_namespace: Optional target graph/database/space name; passed
            to both ``define_schema`` and ``ingest`` for consistent resolution.
    """
    ingestion_params = ingestion_params or IngestionParams()
    if clear_data is None:
        clear_data = ingestion_params.clear_data
    if recreate_schema is None:
        recreate_schema = False

    # Define schema first (halts with SchemaExistsError if schema exists and recreate_schema is False)
    self.define_schema(
        manifest=manifest,
        target_db_config=target_db_config,
        recreate_schema=recreate_schema,
        graph_target_namespace=graph_target_namespace,
    )

    # Then ingest data (clear_data is applied inside ingest() when ingestion_params.clear_data)
    ingestion_params = ingestion_params.model_copy(
        update={"clear_data": clear_data}
    )
    self.ingest(
        manifest=manifest,
        target_db_config=target_db_config,
        ingestion_params=ingestion_params,
        connection_provider=connection_provider,
        graph_target_namespace=graph_target_namespace,
    )

define_schema(manifest, target_db_config, recreate_schema=False, graph_target_namespace=None)

Define schema in the target database.

This method handles database/schema creation and initialization. Some databases don't require explicit schema definition (e.g., Neo4j), but this method ensures the database is properly initialized.

If the schema/graph already exists and recreate_schema is False (default), init_db raises SchemaExistsError and the script halts.

Parameters:

Name Type Description Default
manifest GraphManifest

GraphManifest with schema block.

required
target_db_config DBConfig

Target database connection configuration

required
recreate_schema bool

If True, drop existing schema and define new one. If False and schema/graph already exists, raises SchemaExistsError.

False
graph_target_namespace str | None

Optional target graph/database/space name (e.g. temp schema). Overrides schema.db_profile.target_namespace and defaults ahead of schema.metadata.name when the config omits the namespace.

None
Source code in graflo/hq/graph_engine.py
def define_schema(
    self,
    manifest: GraphManifest,
    target_db_config: DBConfig,
    recreate_schema: bool = False,
    graph_target_namespace: str | None = None,
) -> None:
    """Define schema in the target database.

    This method handles database/schema creation and initialization.
    Some databases don't require explicit schema definition (e.g., Neo4j),
    but this method ensures the database is properly initialized.

    If the schema/graph already exists and recreate_schema is False (default),
    init_db raises SchemaExistsError and the script halts.

    Args:
        manifest: GraphManifest with schema block.
        target_db_config: Target database connection configuration
        recreate_schema: If True, drop existing schema and define new one.
            If False and schema/graph already exists, raises SchemaExistsError.
        graph_target_namespace: Optional target graph/database/space name (e.g. temp
            schema). Overrides ``schema.db_profile.target_namespace`` and defaults
            ahead of ``schema.metadata.name`` when the config omits the namespace.
    """
    schema = manifest.require_schema()

    _ensure_graph_target_namespace(schema, target_db_config, graph_target_namespace)

    # Ensure schema reflects target DB so finish_init applies DB-specific defaults.
    schema.db_profile.db_flavor = target_db_config.connection_type
    schema.finish_init()

    # Initialize database with schema definition
    # init_db() handles database/schema creation automatically
    # It checks if the database exists and creates it if needed
    with ConnectionManager(connection_config=target_db_config) as db_client:
        db_client.init_db(schema, recreate_schema)

infer_manifest(postgres_config, schema_name=None, fuzzy_threshold=0.8, discard_disconnected_vertices=False)

Infer a GraphManifest from PostgreSQL database.

Parameters:

Name Type Description Default
postgres_config PostgresConfig

PostgresConfig instance

required
schema_name str | None

Schema name to introspect (defaults to config schema_name or 'public')

None
fuzzy_threshold float

Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8)

0.8
discard_disconnected_vertices bool

If True, remove vertices that do not take part in any relation (and resources/actors that reference them). Default False.

False

Returns:

Name Type Description
GraphManifest GraphManifest

Inferred manifest with schema, ingestion model, and bindings.

Source code in graflo/hq/graph_engine.py
def infer_manifest(
    self,
    postgres_config: PostgresConfig,
    schema_name: str | None = None,
    fuzzy_threshold: float = 0.8,
    discard_disconnected_vertices: bool = False,
) -> GraphManifest:
    """Infer a GraphManifest from PostgreSQL database.

    Args:
        postgres_config: PostgresConfig instance
        schema_name: Schema name to introspect (defaults to config schema_name or 'public')
        fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8)
        discard_disconnected_vertices: If True, remove vertices that do not take part in
            any relation (and resources/actors that reference them). Default False.

    Returns:
        GraphManifest: Inferred manifest with schema, ingestion model, and bindings.
    """
    with PostgresConnection(postgres_config) as postgres_conn:
        inferencer = SQLInferenceManager(
            conn=postgres_conn,
            target_db_flavor=self.target_db_flavor,
            fuzzy_threshold=fuzzy_threshold,
        )
        artifacts = inferencer.infer_artifacts(schema_name=schema_name)
        schema, ingestion_model = artifacts.schema, artifacts.ingestion_model
        bindings, provider = (
            self.resource_mapper.create_bindings_with_provider_from_introspection(
                introspection_result=artifacts.introspection_result,
                conn=postgres_conn,
                schema_name=schema_name,
            )
        )
        self.connection_provider = provider
    if discard_disconnected_vertices:
        disconnected = schema.remove_disconnected_vertices()
        ingestion_model.prune_to_graph(
            schema.core_schema, disconnected=disconnected
        )
        connected_resources = {
            resource.name for resource in ingestion_model.resources
        }
        connectors = list(bindings.connectors)
        resource_connector = list(bindings.resource_connector)
        connector_connection = list(bindings.connector_connection)

        connector_refs_all = set()
        for connector in connectors:
            connector_refs_all.add(connector.hash)
            if connector.name:
                connector_refs_all.add(connector.name)
        filtered_resource_connector = []
        mapped_connector_refs = set()
        for mapping in resource_connector:
            if isinstance(mapping, dict):
                resource_name = mapping.get("resource")
                connector_ref = mapping.get("connector")
            else:
                resource_name = mapping.resource
                connector_ref = mapping.connector
            if (
                resource_name in connected_resources
                and isinstance(connector_ref, str)
                and connector_ref in connector_refs_all
            ):
                filtered_resource_connector.append(mapping)
                mapped_connector_refs.add(connector_ref)
        filtered_connectors = [
            connector
            for connector in connectors
            if connector.resource_name in connected_resources
            or connector.hash in mapped_connector_refs
            or (
                connector.name is not None
                and connector.name in mapped_connector_refs
            )
        ]
        valid_connector_refs = set()
        for connector in filtered_connectors:
            valid_connector_refs.add(connector.hash)
            if connector.name:
                valid_connector_refs.add(connector.name)
        filtered_connector_connection = []
        for mapping in connector_connection:
            if isinstance(mapping, dict):
                connector_ref = mapping.get("connector")
            else:
                connector_ref = mapping.connector
            if connector_ref in valid_connector_refs:
                filtered_connector_connection.append(mapping)
        bindings_dict = bindings.to_dict(skip_defaults=False)
        bindings_dict["connectors"] = filtered_connectors
        bindings_dict["resource_connector"] = filtered_resource_connector
        bindings_dict["connector_connection"] = filtered_connector_connection
        bindings = Bindings.from_dict(bindings_dict)
    manifest = GraphManifest(
        graph_schema=schema, ingestion_model=ingestion_model, bindings=bindings
    )
    # Apply DB-flavor-specific sanitization a posteriori (reserved words,
    # TigerGraph identity normalization, etc.). Sanitizer is the single
    # entry point that maps `target_db_flavor` to the corresponding
    # evolution ops.
    Sanitizer(self.target_db_flavor).sanitize_manifest(manifest)
    return manifest

infer_schema_from_rdf(source, *, endpoint_url=None, graph_uri=None, schema_name=None)

Infer a graflo Schema from an RDF / OWL ontology.

Reads the TBox (class and property declarations) and produces vertices (from owl:Class), fields (from owl:DatatypeProperty), and edges (from owl:ObjectProperty with domain/range).

Parameters:

Name Type Description Default
source str | Path

Path to an RDF file (e.g. ontology.ttl) or a base URL when using endpoint_url.

required
endpoint_url str | None

Optional SPARQL endpoint to CONSTRUCT the ontology from.

None
graph_uri str | None

Named graph containing the ontology.

None
schema_name str | None

Name for the resulting schema.

None

Returns:

Type Description
tuple[Schema, IngestionModel]

tuple[Schema, IngestionModel]: fully initialised schema and ingestion model.

Source code in graflo/hq/graph_engine.py
def infer_schema_from_rdf(
    self,
    source: str | Path,
    *,
    endpoint_url: str | None = None,
    graph_uri: str | None = None,
    schema_name: str | None = None,
) -> tuple[Schema, IngestionModel]:
    """Infer a graflo Schema from an RDF / OWL ontology.

    Reads the TBox (class and property declarations) and produces
    vertices (from ``owl:Class``), fields (from ``owl:DatatypeProperty``),
    and edges (from ``owl:ObjectProperty`` with domain/range).

    Args:
        source: Path to an RDF file (e.g. ``ontology.ttl``) or a base
            URL when using *endpoint_url*.
        endpoint_url: Optional SPARQL endpoint to CONSTRUCT the
            ontology from.
        graph_uri: Named graph containing the ontology.
        schema_name: Name for the resulting schema.

    Returns:
        tuple[Schema, IngestionModel]: fully initialised schema and ingestion model.
    """
    from graflo.hq.rdf_inferencer import RdfInferenceManager

    mgr = RdfInferenceManager(target_db_flavor=self.target_db_flavor)
    return mgr.infer_schema(
        source,
        endpoint_url=endpoint_url,
        graph_uri=graph_uri,
        schema_name=schema_name,
    )

ingest(manifest, target_db_config, ingestion_params=None, connection_provider=None, graph_target_namespace=None)

Ingest data into the graph database.

If ingestion_params.clear_data is True, removes all existing data (without touching the schema) before ingestion.

Parameters:

Name Type Description Default
manifest GraphManifest

GraphManifest with schema/ingestion/bindings blocks.

required
target_db_config DBConfig

Target database connection configuration

required
ingestion_params IngestionParams | None

IngestionParams instance with ingestion configuration. If None, uses default IngestionParams()

None
graph_target_namespace str | None

Same semantics as define_schema; use when calling ingest without a prior define_schema on this config.

None
Source code in graflo/hq/graph_engine.py
def ingest(
    self,
    manifest: GraphManifest,
    target_db_config: DBConfig,
    ingestion_params: IngestionParams | None = None,
    connection_provider: ConnectionProvider | None = None,
    graph_target_namespace: str | None = None,
) -> None:
    """Ingest data into the graph database.

    If ingestion_params.clear_data is True, removes all existing data
    (without touching the schema) before ingestion.

    Args:
        manifest: GraphManifest with schema/ingestion/bindings blocks.
        target_db_config: Target database connection configuration
        ingestion_params: IngestionParams instance with ingestion configuration.
            If None, uses default IngestionParams()
        graph_target_namespace: Same semantics as ``define_schema``; use when
            calling ``ingest`` without a prior ``define_schema`` on this config.
    """
    schema = manifest.require_schema()
    ingestion_model = manifest.require_ingestion_model()
    bindings = manifest.bindings

    _ensure_graph_target_namespace(schema, target_db_config, graph_target_namespace)

    ingestion_params = ingestion_params or IngestionParams()
    if ingestion_params.clear_data:
        with ConnectionManager(connection_config=target_db_config) as db_client:
            clear_result = db_client.clear_data(schema)
            if inspect.isawaitable(clear_result):
                raise TypeError(
                    "clear_data must be synchronous so ingestion only starts "
                    "after data clearing has completed."
                )

    caster = Caster(
        schema=schema,
        ingestion_model=ingestion_model,
        ingestion_params=ingestion_params,
    )
    caster.ingest(
        target_db_config=target_db_config,
        bindings=bindings or Bindings(),
        ingestion_params=ingestion_params,
        connection_provider=connection_provider or self.connection_provider,
    )

introspect(postgres_config, schema_name=None, include_raw_tables=True)

Introspect PostgreSQL schema and return a serializable result.

Parameters:

Name Type Description Default
postgres_config PostgresConfig

PostgresConfig instance

required
schema_name str | None

Schema name to introspect (defaults to config schema_name or 'public')

None

Returns:

Name Type Description
SchemaIntrospectionResult SchemaIntrospectionResult

Introspection result (vertex_tables, edge_tables, raw_tables, schema_name) suitable for serialization.

Source code in graflo/hq/graph_engine.py
def introspect(
    self,
    postgres_config: PostgresConfig,
    schema_name: str | None = None,
    include_raw_tables: bool = True,
) -> SchemaIntrospectionResult:
    """Introspect PostgreSQL schema and return a serializable result.

    Args:
        postgres_config: PostgresConfig instance
        schema_name: Schema name to introspect (defaults to config schema_name or 'public')

    Returns:
        SchemaIntrospectionResult: Introspection result (vertex_tables, edge_tables,
            raw_tables, schema_name) suitable for serialization.
    """
    with PostgresConnection(postgres_config) as postgres_conn:
        inferencer = SQLInferenceManager(
            conn=postgres_conn,
            target_db_flavor=self.target_db_flavor,
        )
        return inferencer.introspect(
            schema_name=schema_name,
            include_raw_tables=include_raw_tables,
        )