Example 5: PostgreSQL Schema Inference and Ingestion¶
This example demonstrates how to automatically infer a graph schema from a PostgreSQL database and ingest data directly from PostgreSQL tables into a graph database. This is particularly useful for migrating relational data to graph databases or creating graph views of existing PostgreSQL databases.
Overview¶
Instead of manually defining schemas and exporting data to files, this example shows how to:
- Automatically detect vertex-like and edge-like tables in PostgreSQL
- Infer the graph schema from the database structure
- Map PostgreSQL types to graflo Field types automatically
- Create patterns that map PostgreSQL tables to graph resources
- Ingest data directly from PostgreSQL into a graph database
PostgreSQL Database Structure¶
The example uses a PostgreSQL database with a typical 3NF (Third Normal Form) schema:
Vertex Tables (Entities)¶
users - User accounts:
id(SERIAL PRIMARY KEY) - Unique user identifiername(VARCHAR) - User full nameemail(VARCHAR, UNIQUE) - User email addresscreated_at(TIMESTAMP) - Account creation timestamp
products - Product catalog:
id(SERIAL PRIMARY KEY) - Unique product identifiername(VARCHAR) - Product nameprice(DECIMAL) - Product pricedescription(TEXT) - Product descriptioncreated_at(TIMESTAMP) - Product creation timestamp
Edge Tables (Relationships)¶
purchases - Purchase transactions linking users to products:
id(SERIAL PRIMARY KEY) - Unique purchase identifieruser_id(INTEGER, FOREIGN KEY → users.id) - Purchasing userproduct_id(INTEGER, FOREIGN KEY → products.id) - Purchased productpurchase_date(TIMESTAMP) - Date and time of purchasequantity(INTEGER) - Number of items purchasedtotal_amount(DECIMAL) - Total purchase amount
follows - User follow relationships (self-referential):
id(SERIAL PRIMARY KEY) - Unique follow relationship identifierfollower_id(INTEGER, FOREIGN KEY → users.id) - User who is followingfollowed_id(INTEGER, FOREIGN KEY → users.id) - User being followedcreated_at(TIMESTAMP) - When the follow relationship was created
Automatic Schema Inference¶
The infer_schema_from_postgres() function automatically analyzes your PostgreSQL database and creates a complete graflo Schema. This process involves several sophisticated steps:
How Schema Inference Works¶
- Table Discovery: The function queries PostgreSQL's information schema to discover all tables in the specified schema
- Column Analysis: For each table, it examines columns, data types, constraints (primary keys, foreign keys), and relationships
- Table Classification: Tables are classified as either vertex tables or edge tables using heuristics
- Schema Generation: A complete graflo Schema object is constructed with vertices, edges, resources, and type mappings
Detection Heuristics¶
The inference engine uses intelligent heuristics to classify tables:
Vertex Tables:¶
- Have a primary key (identifies unique entities)
- Contain descriptive columns beyond just foreign keys
- Represent domain entities (users, products, etc.)
- Typically have more non-foreign-key columns than foreign keys
Edge Tables:
- Have 2+ foreign keys (representing relationships between entities)
- May have additional attributes (weights, timestamps, quantities)
- Represent relationships or transactions between entities
- Foreign keys point to vertex tables
Edge Relation Inference:
- The system analyzes table names to infer relationship names
- For example,
purchasestable creates a relationship fromuserstoproducts - Self-referential tables (like
follows) are automatically detected - Uses fuzzy matching to identify source and target vertices from table names
Automatic Type Mapping¶
PostgreSQL types are automatically mapped to graflo Field types with proper type information:
| PostgreSQL Type | graflo Field Type | Notes |
|---|---|---|
INTEGER, BIGINT, SERIAL |
INT |
Integer types preserved |
VARCHAR, TEXT, CHAR |
STRING |
String types preserved |
TIMESTAMP, DATE, TIME |
DATETIME |
Temporal types preserved |
DECIMAL, NUMERIC, REAL, DOUBLE PRECISION |
FLOAT |
Numeric types converted |
BOOLEAN |
BOOL |
Boolean types preserved |
Inferred Schema Structure¶
The inferred schema automatically includes:
- Vertices:
users,products(with typed fields matching PostgreSQL columns) - Edges:
users → products(frompurchasestable) with weights:purchase_date,quantity,total_amountusers → users(fromfollowstable) with weight:created_at- Resources: Automatically created for each table with appropriate actors
- Indexes: Primary keys become vertex indexes, foreign keys become edge indexes
- Weights: Additional columns in edge tables become edge weight properties
Graph Structure Visualization¶
The resulting graph structure shows the relationships between entities:
This diagram shows:
- Vertices:
usersandproductsas nodes - Edges:
users → productsrelationship (purchases)users → usersself-referential relationship (follows)
Vertex Fields Structure¶
Each vertex includes typed fields inferred from PostgreSQL columns:
Step-by-Step Guide¶
Step 1: Connect to PostgreSQL¶
First, establish a connection to your PostgreSQL database. This connection will be used to: - Query the database schema (tables, columns, constraints) - Read data from tables during ingestion - Understand foreign key relationships
What happens:
The PostgresConnection class wraps a psycopg2 connection and provides methods for schema introspection and data querying.
from graflo.db.postgres import PostgresConnection
from graflo.db.connection.onto import PostgresConfig
# Option 1: Load from docker/postgres/.env (recommended)
postgres_conf = PostgresConfig.from_docker_env()
# Option 2: Load from environment variables
# Set: POSTGRES_URI, POSTGRES_USERNAME, POSTGRES_PASSWORD, POSTGRES_DATABASE, POSTGRES_SCHEMA_NAME
# postgres_conf = PostgresConfig.from_env()
# Option 3: Create config directly
# postgres_conf = PostgresConfig(
# uri="postgresql://localhost:5432",
# username="postgres",
# password="postgres",
# database="mydb",
# schema_name="public"
# )
postgres_conn = PostgresConnection(postgres_conf)
Step 2: Initialize Database (Optional)¶
If you need to set up the database schema, you can load it from a SQL file:
from pathlib import Path
def load_mock_schema_if_needed(postgres_conn: PostgresConnection) -> None:
"""Load mock schema SQL file into PostgreSQL database if it exists."""
schema_file = Path("mock_schema.sql")
if not schema_file.exists():
logger.warning("Mock schema file not found. Assuming database is already initialized.")
return
logger.info(f"Loading mock schema from {schema_file}")
with open(schema_file, "r") as f:
sql_content = f.read()
# Execute SQL statements
with postgres_conn.conn.cursor() as cursor:
# Parse and execute statements...
cursor.execute(sql_content)
postgres_conn.conn.commit()
load_mock_schema_if_needed(postgres_conn)
Step 3: Infer Schema from PostgreSQL¶
Automatically generate a graflo Schema from your PostgreSQL database. This is the core of the automatic inference process:
What infer_schema_from_postgres() does:
-
Queries PostgreSQL Information Schema: The function queries PostgreSQL's information schema to discover all tables in the specified schema. It retrieves column information (names, types, constraints), identifies primary keys and foreign keys, and understands table relationships.
-
Classifies Tables: Each table's structure is analyzed to determine if it's a vertex table (entity) or edge table (relationship). This classification uses heuristics based on primary keys, foreign keys, and column counts.
-
Infers Relationships: For edge tables, the system identifies source and target vertices from foreign keys. It uses table name analysis and fuzzy matching to infer relationship names, and handles self-referential relationships (like
follows). -
Maps Types: PostgreSQL column types are converted to graflo Field types, preserving type information for validation and optimization.
-
Creates Resources: Resource definitions are generated for each table with appropriate actors (VertexActor for vertex tables, EdgeActor for edge tables). Foreign keys are mapped to vertex matching keys.
from graflo.db.postgres import infer_schema_from_postgres
from graflo.onto import DBFlavor
from graflo.db.connection.onto import ArangoConfig, Neo4jConfig
# Connect to target graph database to determine flavor
target_config = ArangoConfig.from_docker_env() # or Neo4jConfig, TigergraphConfig
# Determine db_flavor from target config
from graflo.db import DBType
db_type = target_config.connection_type
db_flavor = DBFlavor(db_type.value) if db_type in (DBType.ARANGO, DBType.NEO4J, DBType.TIGERGRAPH) else DBFlavor.ARANGO
# Infer schema automatically
schema = infer_schema_from_postgres(
postgres_conn,
schema_name="public", # PostgreSQL schema name
db_flavor=db_flavor # Target graph database flavor
)
The inferred schema will have:
- Vertices:
users,productswith typed fields matching PostgreSQL column types -
Edges:
-
users → products(frompurchasestable) with weight properties users → users(fromfollowstable) with weight properties- Resources: Automatically created for each table with appropriate actors
What happens during inference:
- Table Analysis: Each table is examined for primary keys, foreign keys, and column types
- Vertex Detection: Tables with primary keys and descriptive columns become vertices
- Edge Detection: Tables with 2+ foreign keys become edges, with source/target inferred from foreign key relationships
- Type Mapping: PostgreSQL column types are mapped to graflo Field types
- Resource Creation: Each table gets a corresponding resource with actors that map PostgreSQL rows to graph elements
- Weight Extraction: Non-key columns in edge tables become edge weight properties
Step 4: Save Inferred Schema (Optional)¶
You can save the inferred schema to a YAML file for inspection or modification:
import yaml
from pathlib import Path
schema_output_file = Path("generated-schema.yaml")
schema_dict = schema.to_dict()
with open(schema_output_file, "w") as f:
yaml.safe_dump(schema_dict, f, default_flow_style=False, sort_keys=False)
logger.info(f"Inferred schema saved to {schema_output_file}")
Step 5: Create Patterns from PostgreSQL Tables¶
Create Patterns that map PostgreSQL tables to resources:
from graflo.db.postgres import create_patterns_from_postgres
# Create patterns from PostgreSQL tables
patterns = create_patterns_from_postgres(
postgres_conn,
schema_name="public"
)
This creates TablePattern instances for each table, which:
- Map table names to resource names (e.g.,
userstable →usersresource) - Store PostgreSQL connection configuration
- Enable the Caster to query data directly from PostgreSQL using SQL
How Patterns Work:
- Each
TablePatterncontains the PostgreSQL connection info and table name - During ingestion, the Caster queries each table using SQL
SELECT * FROM table_name - Data is streamed directly from PostgreSQL without intermediate files
- This enables efficient processing of large tables
Step 6: Ingest Data into Graph Database¶
Finally, ingest the data from PostgreSQL into your target graph database. This is where the actual data transformation and loading happens:
What happens during ingestion:
-
Resource Processing: For each resource in the schema, the system queries the corresponding PostgreSQL table using SQL and streams rows from PostgreSQL without loading everything into memory.
-
Vertex Creation: For vertex table resources, each row becomes a vertex. Column values become vertex properties, and the primary key becomes the vertex key/index.
-
Edge Creation: For edge table resources, each row becomes an edge. Foreign keys are used to find source and target vertices, additional columns become edge weight properties, and the edge connects the matched source vertex to the matched target vertex.
-
Batch Processing: Data is processed in batches for efficiency. Vertices are created/upserted first, then edges are created after vertices exist.
-
Graph Database Storage: Data is written to the target graph database (ArangoDB/Neo4j/TigerGraph) using database-specific APIs for optimal performance. The system handles duplicates and updates based on indexes.
from graflo import Caster
# Create Caster with inferred schema
caster = Caster(schema)
# Ingest data from PostgreSQL into graph database
from graflo.caster import IngestionParams
ingestion_params = IngestionParams(
clean_start=True, # Clear existing data first
)
caster.ingest(
output_config=target_config, # Target graph database config
patterns=patterns, # PostgreSQL table patterns
ingestion_params=ingestion_params,
)
# Cleanup
postgres_conn.close()
Complete Example¶
Here's the complete example combining all steps:
import logging
from pathlib import Path
import yaml
from graflo import Caster
from graflo.onto import DBFlavor
from graflo.db import DBType
from graflo.db.postgres import (
PostgresConnection,
create_patterns_from_postgres,
infer_schema_from_postgres,
)
from graflo.db.connection.onto import ArangoConfig, PostgresConfig
logger = logging.getLogger(__name__)
# Step 1: Connect to PostgreSQL (source database)
postgres_conf = PostgresConfig.from_docker_env()
postgres_conn = PostgresConnection(postgres_conf)
# Step 2: Initialize database with mock schema if needed
# (Implementation details omitted - see full example in examples/5-ingest-postgres/ingest.py)
# Step 3: Connect to target graph database
target_config = ArangoConfig.from_docker_env() # or Neo4jConfig, TigergraphConfig
# Step 4: Infer Schema from PostgreSQL database structure
db_type = target_config.connection_type
db_flavor = (
DBFlavor(db_type.value)
if db_type in (DBType.ARANGO, DBType.NEO4J, DBType.TIGERGRAPH)
else DBFlavor.ARANGO
)
schema = infer_schema_from_postgres(
postgres_conn,
schema_name="public",
db_flavor=db_flavor
)
# Step 5: Save inferred schema to YAML (optional)
schema_output_file = Path("generated-schema.yaml")
with open(schema_output_file, "w") as f:
yaml.safe_dump(schema.to_dict(), f, default_flow_style=False, sort_keys=False)
logger.info(f"Inferred schema saved to {schema_output_file}")
# Step 6: Create Patterns from PostgreSQL tables
patterns = create_patterns_from_postgres(postgres_conn, schema_name="public")
# Step 7: Create Caster and ingest data
from graflo.caster import IngestionParams
caster = Caster(schema)
ingestion_params = IngestionParams(
clean_start=True, # Clear existing data first
)
caster.ingest(
output_config=target_config,
patterns=patterns,
ingestion_params=ingestion_params,
)
# Cleanup
postgres_conn.close()
print("\n" + "=" * 80)
print("Ingestion complete!")
print("=" * 80)
print(f"Schema: {schema.general.name}")
print(f"Vertices: {len(schema.vertex_config.vertices)}")
print(f"Edges: {len(list(schema.edge_config.edges_list()))}")
print(f"Resources: {len(schema.resources)}")
print("=" * 80)
Resource Mappings¶
The automatically generated resources map PostgreSQL tables to graph elements. Each resource defines how table rows are transformed into vertices and edges:
Users Resource¶
The users resource maps each row in the users table to a users vertex:
What happens:
- Each row in the
userstable becomes a vertex - Column values (
id,name,email,created_at) become vertex properties - The
idcolumn is used as the vertex key/index
Products Resource¶
The products resource maps each row in the products table to a products vertex:
What happens:
- Each row in the
productstable becomes a vertex - Column values (
id,name,price,description,created_at) become vertex properties - The
idcolumn is used as the vertex key/index
Purchases Resource¶
The purchases resource creates edges between users and products vertices:
What happens:
- Each row in the
purchasestable becomes an edge - The
user_idforeign key maps to the sourceusersvertex (usingidas the match key) - The
product_idforeign key maps to the targetproductsvertex (usingidas the match key) - Additional columns (
purchase_date,quantity,total_amount) become edge weight properties - The edge connects the matched user vertex to the matched product vertex
Follows Resource¶
The follows resource creates self-referential edges between users vertices:
What happens:
- Each row in the
followstable becomes an edge - The
follower_idforeign key maps to the sourceusersvertex (usingidas the match key) - The
followed_idforeign key maps to the targetusersvertex (usingidas the match key) - The
created_atcolumn becomes an edge weight property - The edge connects a user (follower) to another user (followed), creating a social network structure
Generated Schema Example¶
The inferred schema will look like this:
general:
name: public
vertex_config:
vertices:
- name: products
fields:
- name: id
type: INT
- name: name
type: STRING
- name: price
type: FLOAT
- name: description
type: STRING
- name: created_at
type: DATETIME
indexes:
- fields: [id]
- name: users
fields:
- name: id
type: INT
- name: name
type: STRING
- name: email
type: STRING
- name: created_at
type: DATETIME
indexes:
- fields: [id]
edge_config:
edges:
- source: users
target: products
weights:
direct:
- name: purchase_date
- name: quantity
- name: total_amount
- source: users
target: users
weights:
direct:
- name: created_at
resources:
- resource_name: products
apply:
- vertex: products
- resource_name: users
apply:
- vertex: users
- resource_name: purchases
apply:
- target_vertex: users
map:
user_id: id
- target_vertex: products
map:
product_id: id
- resource_name: follows
apply:
- target_vertex: users
map:
follower_id: id
- target_vertex: users
map:
followed_id: id
Key Features¶
Automatic Type Mapping¶
PostgreSQL types are automatically converted to graflo Field types with proper type information:
- Integer types (
INTEGER,BIGINT,SERIAL) →INT - String types (
VARCHAR,TEXT,CHAR) →STRING - Numeric types (
DECIMAL,NUMERIC,REAL) →FLOAT - Date/Time types (
TIMESTAMP,DATE,TIME) →DATETIME - Boolean types →
BOOL
Intelligent Table Classification¶
The system automatically classifies tables as:
- Vertex tables: Tables with primary keys and descriptive columns
- Edge tables: Tables with 2+ foreign keys representing relationships
Automatic Resource Creation¶
Resources are automatically created for each table with appropriate actors:
- Vertex tables: Create
VertexActorto map rows to vertices - Edge tables: Create
EdgeActorwith proper field mappings for source and target vertices
Type-Safe Field Definitions¶
All fields in the inferred schema include type information, enabling:
- Better validation during ingestion
- Database-specific optimizations
- Type-aware filtering and querying
Data Type Handling¶
Special handling for PostgreSQL-specific types:
- Decimal/Numeric: Automatically converted to
floatwhen reading from PostgreSQL - DateTime: Preserved as
datetimeobjects during processing, serialized to ISO format for JSON - Type preservation: Original types are preserved for accurate duplicate detection
Data Flow and Processing¶
Understanding how data flows from PostgreSQL to the graph database:
Step-by-Step Data Processing¶
- Connection: Establish connection to PostgreSQL database
- Schema Inference: Analyze database structure and generate graflo Schema
- Pattern Creation: Create patterns that map PostgreSQL tables to resources
- Ingestion Process:
- For each resource, query the corresponding PostgreSQL table
- Transform each row according to the resource mapping
- Create vertices from vertex table rows
- Create edges from edge table rows, matching foreign keys to vertex keys
- Add weight properties to edges from additional columns
- Graph Database Storage: Store transformed data in the target graph database
Foreign Key Matching¶
The system uses foreign keys to match edge table rows to vertices:
- Edge Table Row:
{user_id: 5, product_id: 12, quantity: 2, total_amount: 99.98} - Source Vertex Match: Find
usersvertex whereid = 5 - Target Vertex Match: Find
productsvertex whereid = 12 - Edge Creation: Create edge from matched user vertex to matched product vertex
- Weight Properties: Add
quantity: 2andtotal_amount: 99.98to the edge
This matching happens automatically based on the foreign key relationships detected during schema inference.
Benefits¶
- No manual schema definition: Schema is automatically inferred from existing database structure
- Direct database access: No need to export data to files first
- Automatic resource mapping: Tables are automatically mapped to graph resources
- Type safety: Proper handling of PostgreSQL-specific types with automatic conversion
- Flexible: Works with any 3NF PostgreSQL schema
- Time-saving: Reduces manual configuration significantly
- Maintainable: Schema can be regenerated when database structure changes
Use Cases¶
This pattern is particularly useful for:
- Data Migration: Migrating relational data to graph databases
- Graph Views: Creating graph views of existing PostgreSQL databases
- Relationship Analysis: Analyzing relationships in normalized database schemas
- Graph Analytics: Building graph analytics on top of transactional databases
- Legacy System Integration: Integrating legacy relational systems with modern graph databases
- Data Warehousing: Transforming relational data warehouses into graph structures
Customization¶
Modifying the Inferred Schema¶
After inference, you can modify the schema:
# Infer schema
schema = infer_schema_from_postgres(postgres_conn, schema_name="public")
# Modify schema as needed
# Add custom transforms, filters, or additional edges
schema.vertex_config.vertices[0].filters.append(...)
# Use modified schema
caster = Caster(schema)
Manual Pattern Creation¶
You can also create patterns manually for more control:
from graflo.util.onto import Patterns, TablePattern
patterns = Patterns(
_resource_mapping={
"users": ("db1", "users"), # (config_key, table_name)
"products": ("db1", "products"),
},
_postgres_connections={
"db1": postgres_conf, # Maps config_key to PostgresConfig
}
)
Key Takeaways¶
- Automatic schema inference eliminates manual schema definition for 3NF databases
- Type mapping ensures proper type handling across PostgreSQL and graph databases
- Direct database access enables efficient data ingestion without intermediate files
- Flexible heuristics automatically detect vertices and edges from table structure
- Type-safe fields provide better validation and database-specific optimizations
- Resource generation automatically creates appropriate actors for each table
- Schema customization allows modifications after inference for specific use cases
Next Steps¶
- Explore the PostgreSQL Schema Inference API for advanced usage
- Learn about PostgreSQL Type Mapping for custom type conversions
- Check out Resource Mapping for custom resource creation
- See the full example code for complete implementation
For more examples and detailed explanations, refer to the API Reference.





