Quick Start Guide¶
This guide will help you get started with graflo by showing you how to transform data into a graph structure.
Basic Concepts¶
- graflo uses
Casterclass to cast data into a property graph representation and eventually graph database. - Class
Schemaencodes the logical graph representation (vertices, edges, identities, DB profile). - Class
IngestionModeldefines resources/transforms and how records are mapped into graph entities. Resourceclass defines how data is transformed into a graph (semantic mapping).DataSourcedefines where data comes from (files, APIs, SQL databases, in-memory objects).Bindingsmanages the mapping of resources to their physical data sources (files or PostgreSQL tables).DataSourceRegistrymaps DataSources to Resources (many DataSources can map to the same Resource).- Database backend configurations use Pydantic
BaseSettingswith environment variable support. UseArangoConfig,Neo4jConfig,TigergraphConfig,FalkordbConfig,MemgraphConfig,NebulaConfig, orPostgresConfigdirectly, or load from docker.envfiles usingfrom_docker_env(). All configs inherit fromDBConfigand support unifieddatabase/schema_namestructure witheffective_databaseandeffective_schemaproperties for database-agnostic access. Ifeffective_schemais not set,GraphEngine.define_schema()automatically usesschema.metadata.nameas fallback.
Basic Example¶
Here's a simple example of transforming CSV files of two types, people and department into a graph:
import pathlib
from suthing import FileHandle
from graflo import Bindings, Caster, GraphManifest
from graflo.architecture.contract.bindings import FileConnector
from graflo.db.connection.onto import ArangoConfig
manifest = GraphManifest.from_config(FileHandle.load("manifest.yaml"))
manifest.finish_init()
schema = manifest.require_schema()
ingestion_model = manifest.require_ingestion_model()
caster = Caster(schema=schema, ingestion_model=ingestion_model)
# Option 1: Load config from docker/arango/.env (recommended)
conn_conf = ArangoConfig.from_docker_env()
# Option 2: Load from environment variables
# Set environment variables:
# export ARANGO_URI=http://localhost:8529
# export ARANGO_USERNAME=root
# export ARANGO_PASSWORD=123
# export ARANGO_DATABASE=mygraph
conn_conf = ArangoConfig.from_env()
# Option 3: Load with custom prefix (for multiple configs)
# Set environment variables:
# export USER_ARANGO_URI=http://user-db:8529
# export USER_ARANGO_USERNAME=user
# export USER_ARANGO_PASSWORD=pass
# export USER_ARANGO_DATABASE=usergraph
user_conn_conf = ArangoConfig.from_env(prefix="USER")
# Option 4: Create config directly
# conn_conf = ArangoConfig(
# uri="http://localhost:8535",
# username="root",
# password="123",
# database="mygraph", # For ArangoDB, 'database' maps to schema/graph
# )
# Create bindings with file connectors
# FileConnector includes the path (sub_path) where files are located
bindings = Bindings()
people_connector = FileConnector(regex="^people.*\.csv$", sub_path=pathlib.Path("."))
bindings.add_connector(
people_connector,
)
bindings.bind_resource("people", people_connector)
departments_connector = FileConnector(
regex="^dep.*\.csv$", sub_path=pathlib.Path(".")
)
bindings.add_connector(
departments_connector,
)
bindings.bind_resource("departments", departments_connector)
# Or initialize from explicit connector bindings
bindings = Bindings(
connectors=[
FileConnector(
name="people_files",
regex="^people.*\\.csv$",
sub_path=pathlib.Path("."),
),
FileConnector(
name="departments_files",
regex="^dep.*\\.csv$",
sub_path=pathlib.Path("."),
),
],
resource_connector=[
{"resource": "people", "connector": "people_files"},
{"resource": "departments", "connector": "departments_files"},
],
)
from graflo.hq.caster import IngestionParams
from graflo.hq import GraphEngine
# Option 1: Use GraphEngine for schema definition and ingestion (recommended)
engine = GraphEngine()
ingestion_params = IngestionParams(
clear_data=False,
)
# Attach bindings to the manifest before orchestration.
ingest_manifest = manifest.model_copy(update={"bindings": bindings})
ingest_manifest.finish_init()
engine.define_and_ingest(
manifest=ingest_manifest,
target_db_config=conn_conf, # Target database config
ingestion_params=ingestion_params,
recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists)
)
# Option 2: Use Caster directly (schema must be defined separately)
# engine = GraphEngine()
# engine.define_schema(manifest=manifest, target_db_config=conn_conf, recreate_schema=False)
#
# caster = Caster(schema=schema, ingestion_model=ingestion_model)
# caster.ingest(
# target_db_config=conn_conf,
# bindings=bindings,
# ingestion_params=ingestion_params,
# )
Here schema defines the logical graph, while ingestion_model defines resources/transforms and bindings maps resources to physical data sources. See Creating a Manifest and Concepts — Schema for details.
Bindings maps resource names (from IngestionModel) to one or more physical data sources (the same resource may list several connectors):
- FileConnector: For file-based resources with regex for matching filenames and sub_path for the directory to search
- TableConnector: For PostgreSQL table resources (table/schema/view metadata on the connector; connection URLs and secrets are not stored in the manifest when using connector_connection — see below)
- SparqlConnector: RDF class / SPARQL endpoint wiring (same proxy pattern as SQL when needed)
For SQL and SPARQL sources, add connector_connection: a list of {"connector": "<connector name or hash>", "conn_proxy": "<label>"}. At runtime, register each conn_proxy on an InMemoryConnectionProvider (or your own ConnectionProvider) with GeneralizedConnConfig. GraphEngine / ResourceMapper call bind_connector_to_conn_proxy when building bindings from Postgres or RDF workflows so HQ and the manifest stay aligned.
Single-DB quick path (one proxy label)¶
When all SQL connectors use the same conn_proxy, you can wire the runtime config in one call:
from graflo.hq.connection_provider import (
InMemoryConnectionProvider,
PostgresGeneralizedConnConfig,
)
provider = InMemoryConnectionProvider()
provider.bind_single_config_for_bindings(
bindings=bindings,
conn_proxy="postgres_source",
config=PostgresGeneralizedConnConfig(config=postgres_conf),
)
engine.define_and_ingest(
manifest=manifest,
target_db_config=conn_conf,
connection_provider=provider,
)
The ingest() method takes:
- target_db_config: Target graph database configuration (where to write the graph)
- bindings: Source data connectors (where to read data from - files or database tables)
🚀 Using PostgreSQL Tables as Data Sources¶
Automatically infer graph schemas from normalized PostgreSQL databases (3NF) - No manual schema definition needed!
Requirements: Works best with normalized databases (3NF) that have proper primary keys (PK) and foreign keys (FK) decorated. graflo uses intelligent heuristics to automatically detect vertex-like and edge-like tables, infer relationships from foreign keys, and map PostgreSQL types to graph types.
You can ingest data directly from PostgreSQL tables. First, infer the schema from your PostgreSQL database:
from graflo.hq import GraphEngine
from graflo.db.connection.onto import PostgresConfig
# Connect to PostgreSQL
pg_config = PostgresConfig.from_docker_env() # Or from_env(), or create directly
# Create GraphEngine and infer a full manifest from PostgreSQL
# (automatically detects vertices/edges/resources and also infers bindings)
# Connection is automatically managed inside infer_manifest()
engine = GraphEngine()
manifest = engine.infer_manifest(pg_config, schema_name="public")
# Inferred bindings are available on the manifest by default
bindings = manifest.require_bindings()
# You can still create or override bindings manually when needed
from graflo.architecture.contract.bindings import Bindings, TableConnector
bindings = Bindings()
users_connector = TableConnector(table_name="users", schema_name="public")
bindings.add_connector(
users_connector,
)
bindings.bind_resource("users", users_connector)
products_connector = TableConnector(table_name="products", schema_name="public")
bindings.add_connector(
products_connector,
)
bindings.bind_resource("products", products_connector)
# Ingest
from graflo.db.connection.onto import ArangoConfig
from graflo.hq import GraphEngine
arango_config = ArangoConfig.from_docker_env() # Target graph database
# Use GraphEngine for schema definition and ingestion
engine = GraphEngine()
ingestion_params = IngestionParams(
clear_data=False,
# Optional: restrict to a date range with datetime_after, datetime_before, datetime_column
# (use with create_bindings(..., datetime_columns={...}) for per-table columns)
)
ingest_manifest = manifest.model_copy(update={"bindings": bindings})
ingest_manifest.finish_init()
engine.define_and_ingest(
manifest=ingest_manifest,
target_db_config=arango_config, # Target graph database
ingestion_params=ingestion_params,
recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists)
)
Using API Data Sources¶
You can also ingest data from REST API endpoints:
from graflo import Caster, DataSourceRegistry, GraphManifest
from graflo.data_source import DataSourceFactory, APIConfig, PaginationConfig
manifest = GraphManifest.from_config(FileHandle.load("manifest.yaml"))
manifest.finish_init()
schema = manifest.require_schema()
ingestion_model = manifest.require_ingestion_model()
# Create API data source
api_config = APIConfig(
url="https://api.example.com/users",
method="GET",
pagination=PaginationConfig(
strategy="offset",
offset_param="offset",
limit_param="limit",
page_size=100,
has_more_path="has_more",
data_path="data",
),
)
api_source = DataSourceFactory.create_api_data_source(api_config)
# Register with resource
registry = DataSourceRegistry()
registry.register(api_source, resource_name="users")
# Ingest
from graflo.hq.caster import IngestionParams
from graflo.hq import GraphEngine
# Define schema first (required before ingestion)
engine = GraphEngine()
engine.define_schema(
manifest=manifest,
target_db_config=conn_conf,
recreate_schema=False,
)
# Then ingest using Caster
caster = Caster(schema=schema, ingestion_model=ingestion_model)
ingestion_params = IngestionParams() # Use default parameters
import asyncio
asyncio.run(
caster.ingest_data_sources(
data_source_registry=registry,
conn_conf=conn_conf, # Target database config
ingestion_params=ingestion_params,
)
)
Using Configuration Files¶
You can also use a configuration file to define data sources:
# data_sources.yaml
data_sources:
- source_type: api
resource_name: users
config:
url: https://api.example.com/users
method: GET
pagination:
strategy: offset
page_size: 100
data_path: data
- source_type: file
resource_name: products
path: data/products.json
Then use it with the CLI:
uv run ingest \
--db-config-path config/db.yaml \
--schema-path config/manifest.yaml \
--data-source-config-path data_sources.yaml
Document cast errors and --doc-error-sink¶
If some source documents fail while casting a resource (bad shape, transform error, etc.), you can keep ingesting the rest with --on-doc-error skip (the default) and record each failure as gzip-compressed JSON lines:
uv run ingest \
--db-config-path config/db.yaml \
--schema-path config/manifest.yaml \
--source-path data/ \
--on-doc-error skip \
--doc-error-sink ./artifacts/doc_cast_failures.jsonl.gz
Inspect the file with zcat ./artifacts/doc_cast_failures.jsonl.gz | head. The same option exists on IngestionParams.doc_error_sink_path when you drive Caster or GraphEngine from Python. Full behavior (budget limits, document preview bounds, logging when no path is set) is described under Document cast errors and doc error sink.
Database Configuration Options¶
graflo supports multiple ways to configure database connections:
Environment Variables¶
You can configure database connections using environment variables. Each database type has its own prefix:
ArangoDB:
export ARANGO_URI=http://localhost:8529
export ARANGO_USERNAME=root
export ARANGO_PASSWORD=123
export ARANGO_DATABASE=mygraph
Neo4j:
export NEO4J_URI=bolt://localhost:7687
export NEO4J_USERNAME=neo4j
export NEO4J_PASSWORD=password
export NEO4J_DATABASE=mydb
TigerGraph:
export TIGERGRAPH_URI=http://localhost:9000
export TIGERGRAPH_USERNAME=tigergraph
export TIGERGRAPH_PASSWORD=tigergraph
export TIGERGRAPH_SCHEMA_NAME=mygraph
FalkorDB:
export FALKORDB_URI=redis://localhost:6379
export FALKORDB_PASSWORD=
export FALKORDB_DATABASE=mygraph
Memgraph:
export MEMGRAPH_URI=bolt://localhost:7687
export MEMGRAPH_USERNAME=
export MEMGRAPH_PASSWORD=
export MEMGRAPH_DATABASE=memgraph
NebulaGraph:
export NEBULA_URI=nebula://localhost:9669
export NEBULA_USERNAME=root
export NEBULA_PASSWORD=nebula
export NEBULA_SCHEMA_NAME=mygraph
export NEBULA_VERSION=3 # "3" for v3.x (nGQL) or "5" for v5.x (GQL)
PostgreSQL:
export POSTGRES_URI=postgresql://localhost:5432
export POSTGRES_USERNAME=postgres
export POSTGRES_PASSWORD=password
export POSTGRES_DATABASE=mydb
export POSTGRES_SCHEMA_NAME=public
Then load the config:
from graflo.db.connection.onto import ArangoConfig, Neo4jConfig, TigergraphConfig, FalkordbConfig, MemgraphConfig, NebulaConfig, PostgresConfig
# Load from default environment variables
arango_conf = ArangoConfig.from_env()
neo4j_conf = Neo4jConfig.from_env()
tg_conf = TigergraphConfig.from_env()
falkordb_conf = FalkordbConfig.from_env()
memgraph_conf = MemgraphConfig.from_env()
nebula_conf = NebulaConfig.from_env()
pg_conf = PostgresConfig.from_env()
Multiple Configurations with Prefixes¶
For multiple database configurations, use prefixes:
# User database
export USER_ARANGO_URI=http://user-db:8529
export USER_ARANGO_USERNAME=user
export USER_ARANGO_PASSWORD=pass
export USER_ARANGO_DATABASE=usergraph
# Knowledge graph database
export KG_ARANGO_URI=http://kg-db:8529
export KG_ARANGO_USERNAME=kg
export KG_ARANGO_PASSWORD=secret
export KG_ARANGO_DATABASE=knowledgegraph
Docker Environment Files¶
Load from docker .env files:
Direct Configuration¶
Create config objects directly:
conn_conf = ArangoConfig(
uri="http://localhost:8529",
username="root",
password="123",
database="mygraph",
)
Next Steps¶
- Explore the API Reference for detailed documentation
- Check out more Examples for advanced use cases
- Learn main concepts, such as
Schemaand its constituents - Read about Data Sources for API and SQL integration