Skip to content

graflo.db.postgres

PostgreSQL database implementation.

This package provides PostgreSQL-specific implementations for schema introspection and connection management. It focuses on reading and analyzing 3NF schemas to identify vertex-like and edge-like tables, and inferring graflo Schema objects.

Key Components
  • PostgresConnection: PostgreSQL connection and schema introspection implementation
  • PostgresSchemaInferencer: Infers graflo Schema from PostgreSQL schemas
  • PostgresResourceMapper: Maps PostgreSQL tables to graflo Resources
Example

from graflo.db.postgres.heuristics import infer_schema_from_postgres >>> from graflo.db.postgres import PostgresConnection from graflo.db.connection.onto import PostgresConfig config = PostgresConfig.from_docker_env() conn = PostgresConnection(config) schema = infer_schema_from_postgres(conn, schema_name="public") conn.close()

PostgresConnection

PostgreSQL connection for schema introspection.

This class provides PostgreSQL-specific functionality for connecting to databases and introspecting 3NF schemas to identify vertex-like and edge-like tables.

Attributes:

Name Type Description
config

PostgreSQL connection configuration

conn

psycopg2 connection instance

Source code in graflo/db/postgres/conn.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
class PostgresConnection:
    """PostgreSQL connection for schema introspection.

    This class provides PostgreSQL-specific functionality for connecting to databases
    and introspecting 3NF schemas to identify vertex-like and edge-like tables.

    Attributes:
        config: PostgreSQL connection configuration
        conn: psycopg2 connection instance
    """

    def __init__(self, config: PostgresConfig):
        """Initialize PostgreSQL connection.

        Args:
            config: PostgreSQL connection configuration containing URI and credentials
        """
        self.config = config

        # Validate required config values
        if config.uri is None:
            raise ValueError("PostgreSQL connection requires a URI to be configured")
        if config.database is None:
            raise ValueError(
                "PostgreSQL connection requires a database name to be configured"
            )

        # Use config properties directly - all fallbacks are handled in PostgresConfig
        host = config.hostname or "localhost"
        port = int(config.port) if config.port else 5432
        database = config.database
        user = config.username or "postgres"
        password = config.password

        # Build connection parameters dict
        conn_params = {
            "host": host,
            "port": port,
            "database": database,
            "user": user,
        }

        if password:
            conn_params["password"] = password

        try:
            self.conn = psycopg2.connect(**conn_params)
            logger.info(f"Successfully connected to PostgreSQL database '{database}'")
        except Exception as e:
            logger.error(f"Failed to connect to PostgreSQL: {e}", exc_info=True)
            raise

    def read(self, query: str, params: tuple | None = None) -> list[dict[str, Any]]:
        """Execute a SELECT query and return results as a list of dictionaries.

        Args:
            query: SQL SELECT query to execute
            params: Optional tuple of parameters for parameterized queries

        Returns:
            List of dictionaries, where each dictionary represents a row with column names as keys.
            Decimal values are converted to float for compatibility with graph databases.
        """
        from decimal import Decimal

        with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)

            # Convert rows to dictionaries and convert Decimal to float
            results = []
            for row in cursor.fetchall():
                row_dict = dict(row)
                # Convert Decimal to float for JSON/graph database compatibility
                for key, value in row_dict.items():
                    if isinstance(value, Decimal):
                        row_dict[key] = float(value)
                results.append(row_dict)

            return results

    def __enter__(self):
        """Enter the context manager.

        Returns:
            PostgresConnection: Self for use in 'with' statements
        """
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        """Exit the context manager.

        Ensures the connection is properly closed when exiting the context.

        Args:
            exc_type: Exception type if an exception occurred
            exc_value: Exception value if an exception occurred
            exc_traceback: Exception traceback if an exception occurred
        """
        self.close()
        return False  # Don't suppress exceptions

    def close(self):
        """Close the PostgreSQL connection."""
        if hasattr(self, "conn") and self.conn:
            try:
                self.conn.close()
                logger.debug("PostgreSQL connection closed")
            except Exception as e:
                logger.warning(
                    f"Error closing PostgreSQL connection: {e}", exc_info=True
                )

    def _check_information_schema_reliable(self, schema_name: str) -> bool:
        """Check if information_schema is reliable for the given schema.

        Args:
            schema_name: Schema name to check

        Returns:
            True if information_schema appears reliable, False otherwise
        """
        try:
            # Try to query information_schema.tables
            query = """
                SELECT COUNT(*) as count
                FROM information_schema.tables
                WHERE table_schema = %s
                  AND table_type = 'BASE TABLE'
            """
            with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
                cursor.execute(query, (schema_name,))
                result = cursor.fetchone()
                # If query succeeds, check if we can also query constraints
                pk_query = """
                    SELECT COUNT(*) as count
                    FROM information_schema.table_constraints tc
                    JOIN information_schema.key_column_usage kcu
                        ON tc.constraint_name = kcu.constraint_name
                        AND tc.table_schema = kcu.table_schema
                    WHERE tc.constraint_type = 'PRIMARY KEY'
                      AND tc.table_schema = %s
                """
                cursor.execute(pk_query, (schema_name,))
                pk_result = cursor.fetchone()
                # If both queries work, information_schema seems reliable
                return result is not None and pk_result is not None
        except Exception as e:
            logger.debug(f"information_schema check failed: {e}")
            return False

    def _get_tables_pg_catalog(self, schema_name: str) -> list[dict[str, Any]]:
        """Get all tables using pg_catalog (fallback method).

        Args:
            schema_name: Schema name to query

        Returns:
            List of table information dictionaries with keys: table_name, table_schema
        """
        query = """
            SELECT
                c.relname as table_name,
                n.nspname as table_schema
            FROM pg_catalog.pg_class c
            JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
            WHERE n.nspname = %s
              AND c.relkind = 'r'
              AND NOT c.relispartition
            ORDER BY c.relname;
        """

        with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query, (schema_name,))
            return [dict(row) for row in cursor.fetchall()]

    def get_tables(self, schema_name: str | None = None) -> list[dict[str, Any]]:
        """Get all tables in the specified schema.

        Tries information_schema first, falls back to pg_catalog if needed.

        Args:
            schema_name: Schema name to query. If None, uses 'public' or config schema_name.

        Returns:
            List of table information dictionaries with keys: table_name, table_schema
        """
        if schema_name is None:
            schema_name = self.config.schema_name or "public"

        # Try information_schema first
        try:
            query = """
                SELECT table_name, table_schema
                FROM information_schema.tables
                WHERE table_schema = %s
                  AND table_type = 'BASE TABLE'
                ORDER BY table_name;
            """

            with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
                cursor.execute(query, (schema_name,))
                results = [dict(row) for row in cursor.fetchall()]
                # If we got results, check if information_schema is reliable
                if results and self._check_information_schema_reliable(schema_name):
                    return results
                # If no results or unreliable, fall back to pg_catalog
                logger.debug(
                    f"information_schema returned no results or is unreliable, "
                    f"falling back to pg_catalog for schema '{schema_name}'"
                )
        except Exception as e:
            logger.debug(
                f"information_schema query failed: {e}, falling back to pg_catalog"
            )

        # Fallback to pg_catalog
        return self._get_tables_pg_catalog(schema_name)

    def _get_table_columns_pg_catalog(
        self, table_name: str, schema_name: str
    ) -> list[dict[str, Any]]:
        """Get columns using pg_catalog (fallback method).

        Args:
            table_name: Name of the table
            schema_name: Schema name

        Returns:
            List of column information dictionaries with keys:
            name, type, description, is_nullable, column_default
        """
        query = """
            SELECT
                a.attname as name,
                pg_catalog.format_type(a.atttypid, a.atttypmod) as type,
                CASE WHEN a.attnotnull THEN 'NO' ELSE 'YES' END as is_nullable,
                pg_catalog.pg_get_expr(d.adbin, d.adrelid) as column_default,
                COALESCE(dsc.description, '') as description
            FROM pg_catalog.pg_attribute a
            JOIN pg_catalog.pg_class c ON c.oid = a.attrelid
            JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
            LEFT JOIN pg_catalog.pg_attrdef d ON d.adrelid = a.attrelid AND d.adnum = a.attnum
            LEFT JOIN pg_catalog.pg_description dsc ON dsc.objoid = a.attrelid AND dsc.objsubid = a.attnum
            WHERE n.nspname = %s
              AND c.relname = %s
              AND a.attnum > 0
              AND NOT a.attisdropped
            ORDER BY a.attnum;
        """

        with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query, (schema_name, table_name))
            columns = []
            for row in cursor.fetchall():
                col_dict = dict(row)
                # Normalize type format
                if col_dict["type"]:
                    # Remove length info from type if present (e.g., "character varying(255)" -> "varchar")
                    type_str = col_dict["type"]
                    if "(" in type_str:
                        base_type = type_str.split("(")[0]
                        # Map common types
                        type_mapping = {
                            "character varying": "varchar",
                            "character": "char",
                            "double precision": "float8",
                            "real": "float4",
                            "integer": "int4",
                            "bigint": "int8",
                            "smallint": "int2",
                        }
                        col_dict["type"] = type_mapping.get(
                            base_type.lower(), base_type.lower()
                        )
                    else:
                        type_mapping = {
                            "character varying": "varchar",
                            "character": "char",
                            "double precision": "float8",
                            "real": "float4",
                            "integer": "int4",
                            "bigint": "int8",
                            "smallint": "int2",
                        }
                        col_dict["type"] = type_mapping.get(
                            type_str.lower(), type_str.lower()
                        )
                columns.append(col_dict)
            return columns

    def get_table_columns(
        self, table_name: str, schema_name: str | None = None
    ) -> list[dict[str, Any]]:
        """Get columns for a specific table with types and descriptions.

        Tries information_schema first, falls back to pg_catalog if needed.

        Args:
            table_name: Name of the table
            schema_name: Schema name. If None, uses 'public' or config schema_name.

        Returns:
            List of column information dictionaries with keys:
            name, type, description, is_nullable, column_default
        """
        if schema_name is None:
            schema_name = self.config.schema_name or "public"

        # Try information_schema first
        try:
            query = """
                SELECT
                    c.column_name as name,
                    c.data_type as type,
                    c.udt_name as udt_name,
                    c.character_maximum_length,
                    c.is_nullable,
                    c.column_default,
                    COALESCE(d.description, '') as description
                FROM information_schema.columns c
                LEFT JOIN pg_catalog.pg_statio_all_tables st
                    ON st.schemaname = c.table_schema
                    AND st.relname = c.table_name
                LEFT JOIN pg_catalog.pg_description d
                    ON d.objoid = st.relid
                    AND d.objsubid = c.ordinal_position
                WHERE c.table_schema = %s
                  AND c.table_name = %s
                ORDER BY c.ordinal_position;
            """

            with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
                cursor.execute(query, (schema_name, table_name))
                columns = []
                for row in cursor.fetchall():
                    col_dict = dict(row)
                    # Format type with length if applicable
                    if col_dict["character_maximum_length"]:
                        col_dict["type"] = (
                            f"{col_dict['type']}({col_dict['character_maximum_length']})"
                        )
                    # Use udt_name if it's more specific (e.g., varchar, int4)
                    if (
                        col_dict["udt_name"]
                        and col_dict["udt_name"] != col_dict["type"]
                    ):
                        col_dict["type"] = col_dict["udt_name"]
                    # Remove helper fields
                    col_dict.pop("character_maximum_length", None)
                    col_dict.pop("udt_name", None)
                    columns.append(col_dict)

                # If we got results and information_schema is reliable, return them
                if columns and self._check_information_schema_reliable(schema_name):
                    return columns
                # Otherwise fall back to pg_catalog
                logger.debug(
                    f"information_schema returned no results or is unreliable, "
                    f"falling back to pg_catalog for table '{schema_name}.{table_name}'"
                )
        except Exception as e:
            logger.debug(
                f"information_schema query failed: {e}, falling back to pg_catalog"
            )

        # Fallback to pg_catalog
        return self._get_table_columns_pg_catalog(table_name, schema_name)

    def _get_primary_keys_pg_catalog(
        self, table_name: str, schema_name: str
    ) -> list[str]:
        """Get primary key columns using pg_catalog (fallback method).

        Args:
            table_name: Name of the table
            schema_name: Schema name

        Returns:
            List of primary key column names
        """
        query = """
            SELECT a.attname
            FROM pg_catalog.pg_constraint con
            JOIN pg_catalog.pg_class c ON c.oid = con.conrelid
            JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
            JOIN pg_catalog.pg_attribute a ON a.attrelid = con.conrelid AND a.attnum = ANY(con.conkey)
            WHERE n.nspname = %s
              AND c.relname = %s
              AND con.contype = 'p'
            ORDER BY array_position(con.conkey, a.attnum);
        """

        with self.conn.cursor() as cursor:
            cursor.execute(query, (schema_name, table_name))
            return [row[0] for row in cursor.fetchall()]

    def get_primary_keys(
        self, table_name: str, schema_name: str | None = None
    ) -> list[str]:
        """Get primary key columns for a table.

        Tries information_schema first, falls back to pg_catalog if needed.

        Args:
            table_name: Name of the table
            schema_name: Schema name. If None, uses 'public' or config schema_name.

        Returns:
            List of primary key column names
        """
        if schema_name is None:
            schema_name = self.config.schema_name or "public"

        # Try information_schema first
        try:
            query = """
                SELECT kcu.column_name
                FROM information_schema.table_constraints tc
                JOIN information_schema.key_column_usage kcu
                    ON tc.constraint_name = kcu.constraint_name
                    AND tc.table_schema = kcu.table_schema
                WHERE tc.constraint_type = 'PRIMARY KEY'
                  AND tc.table_schema = %s
                  AND tc.table_name = %s
                ORDER BY kcu.ordinal_position;
            """

            with self.conn.cursor() as cursor:
                cursor.execute(query, (schema_name, table_name))
                results = [row[0] for row in cursor.fetchall()]
                # If we got results and information_schema is reliable, return them
                if results and self._check_information_schema_reliable(schema_name):
                    return results
                # Otherwise fall back to pg_catalog
                logger.debug(
                    f"information_schema returned no results or is unreliable, "
                    f"falling back to pg_catalog for primary keys of '{schema_name}.{table_name}'"
                )
        except Exception as e:
            logger.debug(
                f"information_schema query failed: {e}, falling back to pg_catalog"
            )

        # Fallback to pg_catalog
        return self._get_primary_keys_pg_catalog(table_name, schema_name)

    def _get_foreign_keys_pg_catalog(
        self, table_name: str, schema_name: str
    ) -> list[dict[str, Any]]:
        """Get foreign key relationships using pg_catalog (fallback method).

        Handles both single-column and multi-column foreign keys.
        For multi-column foreign keys, returns one row per column.

        Args:
            table_name: Name of the table
            schema_name: Schema name

        Returns:
            List of foreign key dictionaries with keys:
            column, references_table, references_column, constraint_name
        """
        # Use generate_subscripts for better compatibility with older PostgreSQL versions
        query = """
            SELECT
                a.attname as column,
                ref_c.relname as references_table,
                ref_a.attname as references_column,
                con.conname as constraint_name
            FROM pg_catalog.pg_constraint con
            JOIN pg_catalog.pg_class c ON c.oid = con.conrelid
            JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
            JOIN pg_catalog.pg_class ref_c ON ref_c.oid = con.confrelid
            JOIN generate_subscripts(con.conkey, 1) AS i ON true
            JOIN pg_catalog.pg_attribute a ON a.attrelid = con.conrelid AND a.attnum = con.conkey[i]
            JOIN pg_catalog.pg_attribute ref_a ON ref_a.attrelid = con.confrelid AND ref_a.attnum = con.confkey[i]
            WHERE n.nspname = %s
              AND c.relname = %s
              AND con.contype = 'f'
            ORDER BY con.conname, i;
        """

        with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query, (schema_name, table_name))
            return [dict(row) for row in cursor.fetchall()]

    def get_foreign_keys(
        self, table_name: str, schema_name: str | None = None
    ) -> list[dict[str, Any]]:
        """Get foreign key relationships for a table.

        Tries information_schema first, falls back to pg_catalog if needed.

        Args:
            table_name: Name of the table
            schema_name: Schema name. If None, uses 'public' or config schema_name.

        Returns:
            List of foreign key dictionaries with keys:
            column, references_table, references_column, constraint_name
        """
        if schema_name is None:
            schema_name = self.config.schema_name or "public"

        # Try information_schema first
        try:
            query = """
                SELECT
                    kcu.column_name as column,
                    ccu.table_name as references_table,
                    ccu.column_name as references_column,
                    tc.constraint_name
                FROM information_schema.table_constraints tc
                JOIN information_schema.key_column_usage kcu
                    ON tc.constraint_name = kcu.constraint_name
                    AND tc.table_schema = kcu.table_schema
                JOIN information_schema.constraint_column_usage ccu
                    ON ccu.constraint_name = tc.constraint_name
                    AND ccu.table_schema = tc.table_schema
                WHERE tc.constraint_type = 'FOREIGN KEY'
                  AND tc.table_schema = %s
                  AND tc.table_name = %s
                ORDER BY kcu.ordinal_position;
            """

            with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
                cursor.execute(query, (schema_name, table_name))
                results = [dict(row) for row in cursor.fetchall()]
                # If we got results and information_schema is reliable, return them
                if self._check_information_schema_reliable(schema_name):
                    return results
                # Otherwise fall back to pg_catalog
                logger.debug(
                    f"information_schema returned no results or is unreliable, "
                    f"falling back to pg_catalog for foreign keys of '{schema_name}.{table_name}'"
                )
        except Exception as e:
            logger.debug(
                f"information_schema query failed: {e}, falling back to pg_catalog"
            )

        # Fallback to pg_catalog
        return self._get_foreign_keys_pg_catalog(table_name, schema_name)

    def _is_edge_like_table(
        self, table_name: str, pk_columns: list[str], fk_columns: list[dict[str, Any]]
    ) -> bool:
        """Determine if a table is edge-like based on heuristics.

        Heuristics:
        1. Tables with 2 or more primary keys are likely edge tables
        2. Tables with exactly 2 foreign keys are likely edge tables
        3. Tables with names starting with 'rel_' are likely edge tables
        4. Tables where primary key columns match foreign key columns are likely edge tables

        Args:
            table_name: Name of the table
            pk_columns: List of primary key column names
            fk_columns: List of foreign key dictionaries

        Returns:
            True if table appears to be edge-like, False otherwise
        """
        # Heuristic 1: Tables with 2 or more primary keys are likely edge tables
        if len(pk_columns) >= 2:
            return True

        # Heuristic 2: Tables with exactly 2 foreign keys are likely edge tables
        if len(fk_columns) == 2:
            return True

        # Heuristic 3: Tables with names starting with 'rel_' are likely edge tables
        if table_name.startswith("rel_"):
            return True

        # Heuristic 4: If primary key columns match foreign key columns, it's likely an edge table
        fk_column_names = {fk["column"] for fk in fk_columns}
        pk_set = set(pk_columns)
        # If all PK columns are FK columns and we have at least 2 FKs, it's likely an edge table
        if pk_set.issubset(fk_column_names) and len(fk_columns) >= 2:
            return True

        return False

    def detect_vertex_tables(
        self, schema_name: str | None = None
    ) -> list[VertexTableInfo]:
        """Detect vertex-like tables in the schema.

        Heuristic: Tables with a primary key and descriptive columns
        (not just foreign keys). These represent entities.

        Note: Tables identified as edge-like are excluded from vertex tables.

        Args:
            schema_name: Schema name. If None, uses 'public' or config schema_name.

        Returns:
            List of vertex table information dictionaries
        """
        if schema_name is None:
            schema_name = self.config.schema_name or "public"

        tables = self.get_tables(schema_name)
        vertex_tables = []

        for table_info in tables:
            table_name = table_info["table_name"]
            pk_columns = self.get_primary_keys(table_name, schema_name)
            fk_columns = self.get_foreign_keys(table_name, schema_name)
            all_columns = self.get_table_columns(table_name, schema_name)

            # Vertex-like tables have:
            # 1. A primary key
            # 2. Not identified as edge-like tables
            # 3. Descriptive columns beyond just foreign keys

            if not pk_columns:
                continue  # Skip tables without primary keys

            # Skip edge-like tables
            if self._is_edge_like_table(table_name, pk_columns, fk_columns):
                continue

            # Count non-FK, non-PK columns (descriptive columns)
            fk_column_names = {fk["column"] for fk in fk_columns}
            pk_column_names = set(pk_columns)
            descriptive_columns = [
                col
                for col in all_columns
                if col["name"] not in fk_column_names
                and col["name"] not in pk_column_names
            ]

            # If table has descriptive columns, consider it vertex-like
            if descriptive_columns:
                # Mark primary key columns and convert to ColumnInfo
                pk_set = set(pk_columns)
                column_infos = []
                for col in all_columns:
                    column_infos.append(
                        ColumnInfo(
                            name=col["name"],
                            type=col["type"],
                            description=col.get("description", ""),
                            is_nullable=col.get("is_nullable", "YES"),
                            column_default=col.get("column_default"),
                            is_pk=col["name"] in pk_set,
                        )
                    )

                # Convert foreign keys to ForeignKeyInfo
                fk_infos = []
                for fk in fk_columns:
                    fk_infos.append(
                        ForeignKeyInfo(
                            column=fk["column"],
                            references_table=fk["references_table"],
                            references_column=fk.get("references_column"),
                            constraint_name=fk.get("constraint_name"),
                        )
                    )

                vertex_tables.append(
                    VertexTableInfo(
                        name=table_name,
                        schema_name=schema_name,
                        columns=column_infos,
                        primary_key=pk_columns,
                        foreign_keys=fk_infos,
                    )
                )

        return vertex_tables

    def detect_edge_tables(
        self,
        schema_name: str | None = None,
        vertex_table_names: list[str] | None = None,
    ) -> list[EdgeTableInfo]:
        """Detect edge-like tables in the schema.

        Heuristic: Tables with 2 or more primary keys, or exactly 2 foreign keys,
        or names starting with 'rel_'. These represent relationships between entities.

        Args:
            schema_name: Schema name. If None, uses 'public' or config schema_name.
            vertex_table_names: Optional list of vertex table names for fuzzy matching.
                              If None, will be inferred from detect_vertex_tables().

        Returns:
            List of edge table information dictionaries with source_table and target_table
        """
        if schema_name is None:
            schema_name = self.config.schema_name or "public"

        # Get vertex table names if not provided
        if vertex_table_names is None:
            vertex_tables = self.detect_vertex_tables(schema_name)
            vertex_table_names = [vt.name for vt in vertex_tables]

        # Create fuzzy match cache once for all tables (significant performance improvement)
        match_cache = FuzzyMatchCache(vertex_table_names)

        tables = self.get_tables(schema_name)
        edge_tables = []

        for table_info in tables:
            table_name = table_info["table_name"]
            pk_columns = self.get_primary_keys(table_name, schema_name)
            fk_columns = self.get_foreign_keys(table_name, schema_name)

            # Skip tables without primary keys
            if not pk_columns:
                continue

            # Check if table is edge-like
            if not self._is_edge_like_table(table_name, pk_columns, fk_columns):
                continue

            all_columns = self.get_table_columns(table_name, schema_name)

            # Mark primary key columns and convert to ColumnInfo
            pk_set = set(pk_columns)
            column_infos = []
            for col in all_columns:
                column_infos.append(
                    ColumnInfo(
                        name=col["name"],
                        type=col["type"],
                        description=col.get("description", ""),
                        is_nullable=col.get("is_nullable", "YES"),
                        column_default=col.get("column_default"),
                        is_pk=col["name"] in pk_set,
                    )
                )

            # Convert foreign keys to ForeignKeyInfo
            fk_infos = []
            for fk in fk_columns:
                fk_infos.append(
                    ForeignKeyInfo(
                        column=fk["column"],
                        references_table=fk["references_table"],
                        references_column=fk.get("references_column"),
                        constraint_name=fk.get("constraint_name"),
                    )
                )

            # Determine source and target tables
            source_table = None
            target_table = None
            source_column = None
            target_column = None
            relation_name = None

            # If we have exactly 2 foreign keys, use them directly
            if len(fk_infos) == 2:
                source_fk = fk_infos[0]
                target_fk = fk_infos[1]
                source_table = source_fk.references_table
                target_table = target_fk.references_table
                source_column = source_fk.column
                target_column = target_fk.column
                # Still try to infer relation from table name
                fk_dicts = [
                    {
                        "column": fk.column,
                        "references_table": fk.references_table,
                    }
                    for fk in fk_infos
                ]
                _, _, relation_name = infer_edge_vertices_from_table_name(
                    table_name, pk_columns, fk_dicts, vertex_table_names, match_cache
                )
            # If we have 2 or more primary keys, try to infer from table name and structure
            elif len(pk_columns) >= 2:
                # Convert fk_infos to dicts for _infer_edge_vertices_from_table_name
                fk_dicts = [
                    {
                        "column": fk.column,
                        "references_table": fk.references_table,
                    }
                    for fk in fk_infos
                ]

                # Try to infer from table name pattern
                inferred_source, inferred_target, relation_name = (
                    infer_edge_vertices_from_table_name(
                        table_name,
                        pk_columns,
                        fk_dicts,
                        vertex_table_names,
                        match_cache,
                    )
                )

                if inferred_source and inferred_target:
                    source_table = inferred_source
                    target_table = inferred_target
                    # Try to match PK columns to FK columns for source/target columns
                    if fk_infos:
                        # Use first FK for source, second for target if available
                        if len(fk_infos) >= 2:
                            source_column = fk_infos[0].column
                            target_column = fk_infos[1].column
                        elif len(fk_infos) == 1:
                            # Self-reference case
                            source_column = fk_infos[0].column
                            target_column = fk_infos[0].column
                    else:
                        # Use PK columns as source/target columns
                        source_column = pk_columns[0]
                        target_column = (
                            pk_columns[1] if len(pk_columns) > 1 else pk_columns[0]
                        )
                elif fk_infos:
                    # Fallback: use FK references if available
                    if len(fk_infos) >= 2:
                        source_table = fk_infos[0].references_table
                        target_table = fk_infos[1].references_table
                        source_column = fk_infos[0].column
                        target_column = fk_infos[1].column
                    elif len(fk_infos) == 1:
                        source_table = fk_infos[0].references_table
                        target_table = fk_infos[0].references_table
                        source_column = fk_infos[0].column
                        target_column = fk_infos[0].column
                else:
                    # Last resort: use PK columns and infer table names from column names
                    source_column = pk_columns[0]
                    target_column = (
                        pk_columns[1] if len(pk_columns) > 1 else pk_columns[0]
                    )
                    # Use robust inference logic to extract vertex names from column names
                    source_table = infer_vertex_from_column_name(
                        source_column, vertex_table_names, match_cache
                    )
                    target_table = infer_vertex_from_column_name(
                        target_column, vertex_table_names, match_cache
                    )

            # Only add if we have source and target information
            if source_table and target_table:
                edge_tables.append(
                    EdgeTableInfo(
                        name=table_name,
                        schema_name=schema_name,
                        columns=column_infos,
                        primary_key=pk_columns,
                        foreign_keys=fk_infos,
                        source_table=source_table,
                        target_table=target_table,
                        source_column=source_column or pk_columns[0],
                        target_column=target_column
                        or (pk_columns[1] if len(pk_columns) > 1 else pk_columns[0]),
                        relation=relation_name,
                    )
                )
            else:
                logger.warning(
                    f"Could not determine source/target tables for edge-like table '{table_name}'. "
                    f"Skipping."
                )

        return edge_tables

    def introspect_schema(
        self, schema_name: str | None = None
    ) -> SchemaIntrospectionResult:
        """Introspect the database schema and return structured information.

        This is the main method that analyzes the schema and returns information
        about vertex-like and edge-like tables.

        Args:
            schema_name: Schema name. If None, uses 'public' or config schema_name.

        Returns:
            SchemaIntrospectionResult with vertex_tables, edge_tables, and schema_name
        """
        if schema_name is None:
            schema_name = self.config.schema_name or "public"

        logger.info(f"Introspecting PostgreSQL schema '{schema_name}'")

        vertex_tables = self.detect_vertex_tables(schema_name)
        edge_tables = self.detect_edge_tables(schema_name)

        result = SchemaIntrospectionResult(
            vertex_tables=vertex_tables,
            edge_tables=edge_tables,
            schema_name=schema_name,
        )

        logger.info(
            f"Found {len(vertex_tables)} vertex-like tables and {len(edge_tables)} edge-like tables"
        )

        return result

__enter__()

Enter the context manager.

Returns:

Name Type Description
PostgresConnection

Self for use in 'with' statements

Source code in graflo/db/postgres/conn.py
def __enter__(self):
    """Enter the context manager.

    Returns:
        PostgresConnection: Self for use in 'with' statements
    """
    return self

__exit__(exc_type, exc_value, exc_traceback)

Exit the context manager.

Ensures the connection is properly closed when exiting the context.

Parameters:

Name Type Description Default
exc_type

Exception type if an exception occurred

required
exc_value

Exception value if an exception occurred

required
exc_traceback

Exception traceback if an exception occurred

required
Source code in graflo/db/postgres/conn.py
def __exit__(self, exc_type, exc_value, exc_traceback):
    """Exit the context manager.

    Ensures the connection is properly closed when exiting the context.

    Args:
        exc_type: Exception type if an exception occurred
        exc_value: Exception value if an exception occurred
        exc_traceback: Exception traceback if an exception occurred
    """
    self.close()
    return False  # Don't suppress exceptions

__init__(config)

Initialize PostgreSQL connection.

Parameters:

Name Type Description Default
config PostgresConfig

PostgreSQL connection configuration containing URI and credentials

required
Source code in graflo/db/postgres/conn.py
def __init__(self, config: PostgresConfig):
    """Initialize PostgreSQL connection.

    Args:
        config: PostgreSQL connection configuration containing URI and credentials
    """
    self.config = config

    # Validate required config values
    if config.uri is None:
        raise ValueError("PostgreSQL connection requires a URI to be configured")
    if config.database is None:
        raise ValueError(
            "PostgreSQL connection requires a database name to be configured"
        )

    # Use config properties directly - all fallbacks are handled in PostgresConfig
    host = config.hostname or "localhost"
    port = int(config.port) if config.port else 5432
    database = config.database
    user = config.username or "postgres"
    password = config.password

    # Build connection parameters dict
    conn_params = {
        "host": host,
        "port": port,
        "database": database,
        "user": user,
    }

    if password:
        conn_params["password"] = password

    try:
        self.conn = psycopg2.connect(**conn_params)
        logger.info(f"Successfully connected to PostgreSQL database '{database}'")
    except Exception as e:
        logger.error(f"Failed to connect to PostgreSQL: {e}", exc_info=True)
        raise

close()

Close the PostgreSQL connection.

Source code in graflo/db/postgres/conn.py
def close(self):
    """Close the PostgreSQL connection."""
    if hasattr(self, "conn") and self.conn:
        try:
            self.conn.close()
            logger.debug("PostgreSQL connection closed")
        except Exception as e:
            logger.warning(
                f"Error closing PostgreSQL connection: {e}", exc_info=True
            )

detect_edge_tables(schema_name=None, vertex_table_names=None)

Detect edge-like tables in the schema.

Heuristic: Tables with 2 or more primary keys, or exactly 2 foreign keys, or names starting with 'rel_'. These represent relationships between entities.

Parameters:

Name Type Description Default
schema_name str | None

Schema name. If None, uses 'public' or config schema_name.

None
vertex_table_names list[str] | None

Optional list of vertex table names for fuzzy matching. If None, will be inferred from detect_vertex_tables().

None

Returns:

Type Description
list[EdgeTableInfo]

List of edge table information dictionaries with source_table and target_table

Source code in graflo/db/postgres/conn.py
def detect_edge_tables(
    self,
    schema_name: str | None = None,
    vertex_table_names: list[str] | None = None,
) -> list[EdgeTableInfo]:
    """Detect edge-like tables in the schema.

    Heuristic: Tables with 2 or more primary keys, or exactly 2 foreign keys,
    or names starting with 'rel_'. These represent relationships between entities.

    Args:
        schema_name: Schema name. If None, uses 'public' or config schema_name.
        vertex_table_names: Optional list of vertex table names for fuzzy matching.
                          If None, will be inferred from detect_vertex_tables().

    Returns:
        List of edge table information dictionaries with source_table and target_table
    """
    if schema_name is None:
        schema_name = self.config.schema_name or "public"

    # Get vertex table names if not provided
    if vertex_table_names is None:
        vertex_tables = self.detect_vertex_tables(schema_name)
        vertex_table_names = [vt.name for vt in vertex_tables]

    # Create fuzzy match cache once for all tables (significant performance improvement)
    match_cache = FuzzyMatchCache(vertex_table_names)

    tables = self.get_tables(schema_name)
    edge_tables = []

    for table_info in tables:
        table_name = table_info["table_name"]
        pk_columns = self.get_primary_keys(table_name, schema_name)
        fk_columns = self.get_foreign_keys(table_name, schema_name)

        # Skip tables without primary keys
        if not pk_columns:
            continue

        # Check if table is edge-like
        if not self._is_edge_like_table(table_name, pk_columns, fk_columns):
            continue

        all_columns = self.get_table_columns(table_name, schema_name)

        # Mark primary key columns and convert to ColumnInfo
        pk_set = set(pk_columns)
        column_infos = []
        for col in all_columns:
            column_infos.append(
                ColumnInfo(
                    name=col["name"],
                    type=col["type"],
                    description=col.get("description", ""),
                    is_nullable=col.get("is_nullable", "YES"),
                    column_default=col.get("column_default"),
                    is_pk=col["name"] in pk_set,
                )
            )

        # Convert foreign keys to ForeignKeyInfo
        fk_infos = []
        for fk in fk_columns:
            fk_infos.append(
                ForeignKeyInfo(
                    column=fk["column"],
                    references_table=fk["references_table"],
                    references_column=fk.get("references_column"),
                    constraint_name=fk.get("constraint_name"),
                )
            )

        # Determine source and target tables
        source_table = None
        target_table = None
        source_column = None
        target_column = None
        relation_name = None

        # If we have exactly 2 foreign keys, use them directly
        if len(fk_infos) == 2:
            source_fk = fk_infos[0]
            target_fk = fk_infos[1]
            source_table = source_fk.references_table
            target_table = target_fk.references_table
            source_column = source_fk.column
            target_column = target_fk.column
            # Still try to infer relation from table name
            fk_dicts = [
                {
                    "column": fk.column,
                    "references_table": fk.references_table,
                }
                for fk in fk_infos
            ]
            _, _, relation_name = infer_edge_vertices_from_table_name(
                table_name, pk_columns, fk_dicts, vertex_table_names, match_cache
            )
        # If we have 2 or more primary keys, try to infer from table name and structure
        elif len(pk_columns) >= 2:
            # Convert fk_infos to dicts for _infer_edge_vertices_from_table_name
            fk_dicts = [
                {
                    "column": fk.column,
                    "references_table": fk.references_table,
                }
                for fk in fk_infos
            ]

            # Try to infer from table name pattern
            inferred_source, inferred_target, relation_name = (
                infer_edge_vertices_from_table_name(
                    table_name,
                    pk_columns,
                    fk_dicts,
                    vertex_table_names,
                    match_cache,
                )
            )

            if inferred_source and inferred_target:
                source_table = inferred_source
                target_table = inferred_target
                # Try to match PK columns to FK columns for source/target columns
                if fk_infos:
                    # Use first FK for source, second for target if available
                    if len(fk_infos) >= 2:
                        source_column = fk_infos[0].column
                        target_column = fk_infos[1].column
                    elif len(fk_infos) == 1:
                        # Self-reference case
                        source_column = fk_infos[0].column
                        target_column = fk_infos[0].column
                else:
                    # Use PK columns as source/target columns
                    source_column = pk_columns[0]
                    target_column = (
                        pk_columns[1] if len(pk_columns) > 1 else pk_columns[0]
                    )
            elif fk_infos:
                # Fallback: use FK references if available
                if len(fk_infos) >= 2:
                    source_table = fk_infos[0].references_table
                    target_table = fk_infos[1].references_table
                    source_column = fk_infos[0].column
                    target_column = fk_infos[1].column
                elif len(fk_infos) == 1:
                    source_table = fk_infos[0].references_table
                    target_table = fk_infos[0].references_table
                    source_column = fk_infos[0].column
                    target_column = fk_infos[0].column
            else:
                # Last resort: use PK columns and infer table names from column names
                source_column = pk_columns[0]
                target_column = (
                    pk_columns[1] if len(pk_columns) > 1 else pk_columns[0]
                )
                # Use robust inference logic to extract vertex names from column names
                source_table = infer_vertex_from_column_name(
                    source_column, vertex_table_names, match_cache
                )
                target_table = infer_vertex_from_column_name(
                    target_column, vertex_table_names, match_cache
                )

        # Only add if we have source and target information
        if source_table and target_table:
            edge_tables.append(
                EdgeTableInfo(
                    name=table_name,
                    schema_name=schema_name,
                    columns=column_infos,
                    primary_key=pk_columns,
                    foreign_keys=fk_infos,
                    source_table=source_table,
                    target_table=target_table,
                    source_column=source_column or pk_columns[0],
                    target_column=target_column
                    or (pk_columns[1] if len(pk_columns) > 1 else pk_columns[0]),
                    relation=relation_name,
                )
            )
        else:
            logger.warning(
                f"Could not determine source/target tables for edge-like table '{table_name}'. "
                f"Skipping."
            )

    return edge_tables

detect_vertex_tables(schema_name=None)

Detect vertex-like tables in the schema.

Heuristic: Tables with a primary key and descriptive columns (not just foreign keys). These represent entities.

Note: Tables identified as edge-like are excluded from vertex tables.

Parameters:

Name Type Description Default
schema_name str | None

Schema name. If None, uses 'public' or config schema_name.

None

Returns:

Type Description
list[VertexTableInfo]

List of vertex table information dictionaries

Source code in graflo/db/postgres/conn.py
def detect_vertex_tables(
    self, schema_name: str | None = None
) -> list[VertexTableInfo]:
    """Detect vertex-like tables in the schema.

    Heuristic: Tables with a primary key and descriptive columns
    (not just foreign keys). These represent entities.

    Note: Tables identified as edge-like are excluded from vertex tables.

    Args:
        schema_name: Schema name. If None, uses 'public' or config schema_name.

    Returns:
        List of vertex table information dictionaries
    """
    if schema_name is None:
        schema_name = self.config.schema_name or "public"

    tables = self.get_tables(schema_name)
    vertex_tables = []

    for table_info in tables:
        table_name = table_info["table_name"]
        pk_columns = self.get_primary_keys(table_name, schema_name)
        fk_columns = self.get_foreign_keys(table_name, schema_name)
        all_columns = self.get_table_columns(table_name, schema_name)

        # Vertex-like tables have:
        # 1. A primary key
        # 2. Not identified as edge-like tables
        # 3. Descriptive columns beyond just foreign keys

        if not pk_columns:
            continue  # Skip tables without primary keys

        # Skip edge-like tables
        if self._is_edge_like_table(table_name, pk_columns, fk_columns):
            continue

        # Count non-FK, non-PK columns (descriptive columns)
        fk_column_names = {fk["column"] for fk in fk_columns}
        pk_column_names = set(pk_columns)
        descriptive_columns = [
            col
            for col in all_columns
            if col["name"] not in fk_column_names
            and col["name"] not in pk_column_names
        ]

        # If table has descriptive columns, consider it vertex-like
        if descriptive_columns:
            # Mark primary key columns and convert to ColumnInfo
            pk_set = set(pk_columns)
            column_infos = []
            for col in all_columns:
                column_infos.append(
                    ColumnInfo(
                        name=col["name"],
                        type=col["type"],
                        description=col.get("description", ""),
                        is_nullable=col.get("is_nullable", "YES"),
                        column_default=col.get("column_default"),
                        is_pk=col["name"] in pk_set,
                    )
                )

            # Convert foreign keys to ForeignKeyInfo
            fk_infos = []
            for fk in fk_columns:
                fk_infos.append(
                    ForeignKeyInfo(
                        column=fk["column"],
                        references_table=fk["references_table"],
                        references_column=fk.get("references_column"),
                        constraint_name=fk.get("constraint_name"),
                    )
                )

            vertex_tables.append(
                VertexTableInfo(
                    name=table_name,
                    schema_name=schema_name,
                    columns=column_infos,
                    primary_key=pk_columns,
                    foreign_keys=fk_infos,
                )
            )

    return vertex_tables

get_foreign_keys(table_name, schema_name=None)

Get foreign key relationships for a table.

Tries information_schema first, falls back to pg_catalog if needed.

Parameters:

Name Type Description Default
table_name str

Name of the table

required
schema_name str | None

Schema name. If None, uses 'public' or config schema_name.

None

Returns:

Type Description
list[dict[str, Any]]

List of foreign key dictionaries with keys:

list[dict[str, Any]]

column, references_table, references_column, constraint_name

Source code in graflo/db/postgres/conn.py
def get_foreign_keys(
    self, table_name: str, schema_name: str | None = None
) -> list[dict[str, Any]]:
    """Get foreign key relationships for a table.

    Tries information_schema first, falls back to pg_catalog if needed.

    Args:
        table_name: Name of the table
        schema_name: Schema name. If None, uses 'public' or config schema_name.

    Returns:
        List of foreign key dictionaries with keys:
        column, references_table, references_column, constraint_name
    """
    if schema_name is None:
        schema_name = self.config.schema_name or "public"

    # Try information_schema first
    try:
        query = """
            SELECT
                kcu.column_name as column,
                ccu.table_name as references_table,
                ccu.column_name as references_column,
                tc.constraint_name
            FROM information_schema.table_constraints tc
            JOIN information_schema.key_column_usage kcu
                ON tc.constraint_name = kcu.constraint_name
                AND tc.table_schema = kcu.table_schema
            JOIN information_schema.constraint_column_usage ccu
                ON ccu.constraint_name = tc.constraint_name
                AND ccu.table_schema = tc.table_schema
            WHERE tc.constraint_type = 'FOREIGN KEY'
              AND tc.table_schema = %s
              AND tc.table_name = %s
            ORDER BY kcu.ordinal_position;
        """

        with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query, (schema_name, table_name))
            results = [dict(row) for row in cursor.fetchall()]
            # If we got results and information_schema is reliable, return them
            if self._check_information_schema_reliable(schema_name):
                return results
            # Otherwise fall back to pg_catalog
            logger.debug(
                f"information_schema returned no results or is unreliable, "
                f"falling back to pg_catalog for foreign keys of '{schema_name}.{table_name}'"
            )
    except Exception as e:
        logger.debug(
            f"information_schema query failed: {e}, falling back to pg_catalog"
        )

    # Fallback to pg_catalog
    return self._get_foreign_keys_pg_catalog(table_name, schema_name)

get_primary_keys(table_name, schema_name=None)

Get primary key columns for a table.

Tries information_schema first, falls back to pg_catalog if needed.

Parameters:

Name Type Description Default
table_name str

Name of the table

required
schema_name str | None

Schema name. If None, uses 'public' or config schema_name.

None

Returns:

Type Description
list[str]

List of primary key column names

Source code in graflo/db/postgres/conn.py
def get_primary_keys(
    self, table_name: str, schema_name: str | None = None
) -> list[str]:
    """Get primary key columns for a table.

    Tries information_schema first, falls back to pg_catalog if needed.

    Args:
        table_name: Name of the table
        schema_name: Schema name. If None, uses 'public' or config schema_name.

    Returns:
        List of primary key column names
    """
    if schema_name is None:
        schema_name = self.config.schema_name or "public"

    # Try information_schema first
    try:
        query = """
            SELECT kcu.column_name
            FROM information_schema.table_constraints tc
            JOIN information_schema.key_column_usage kcu
                ON tc.constraint_name = kcu.constraint_name
                AND tc.table_schema = kcu.table_schema
            WHERE tc.constraint_type = 'PRIMARY KEY'
              AND tc.table_schema = %s
              AND tc.table_name = %s
            ORDER BY kcu.ordinal_position;
        """

        with self.conn.cursor() as cursor:
            cursor.execute(query, (schema_name, table_name))
            results = [row[0] for row in cursor.fetchall()]
            # If we got results and information_schema is reliable, return them
            if results and self._check_information_schema_reliable(schema_name):
                return results
            # Otherwise fall back to pg_catalog
            logger.debug(
                f"information_schema returned no results or is unreliable, "
                f"falling back to pg_catalog for primary keys of '{schema_name}.{table_name}'"
            )
    except Exception as e:
        logger.debug(
            f"information_schema query failed: {e}, falling back to pg_catalog"
        )

    # Fallback to pg_catalog
    return self._get_primary_keys_pg_catalog(table_name, schema_name)

get_table_columns(table_name, schema_name=None)

Get columns for a specific table with types and descriptions.

Tries information_schema first, falls back to pg_catalog if needed.

Parameters:

Name Type Description Default
table_name str

Name of the table

required
schema_name str | None

Schema name. If None, uses 'public' or config schema_name.

None

Returns:

Type Description
list[dict[str, Any]]

List of column information dictionaries with keys:

list[dict[str, Any]]

name, type, description, is_nullable, column_default

Source code in graflo/db/postgres/conn.py
def get_table_columns(
    self, table_name: str, schema_name: str | None = None
) -> list[dict[str, Any]]:
    """Get columns for a specific table with types and descriptions.

    Tries information_schema first, falls back to pg_catalog if needed.

    Args:
        table_name: Name of the table
        schema_name: Schema name. If None, uses 'public' or config schema_name.

    Returns:
        List of column information dictionaries with keys:
        name, type, description, is_nullable, column_default
    """
    if schema_name is None:
        schema_name = self.config.schema_name or "public"

    # Try information_schema first
    try:
        query = """
            SELECT
                c.column_name as name,
                c.data_type as type,
                c.udt_name as udt_name,
                c.character_maximum_length,
                c.is_nullable,
                c.column_default,
                COALESCE(d.description, '') as description
            FROM information_schema.columns c
            LEFT JOIN pg_catalog.pg_statio_all_tables st
                ON st.schemaname = c.table_schema
                AND st.relname = c.table_name
            LEFT JOIN pg_catalog.pg_description d
                ON d.objoid = st.relid
                AND d.objsubid = c.ordinal_position
            WHERE c.table_schema = %s
              AND c.table_name = %s
            ORDER BY c.ordinal_position;
        """

        with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query, (schema_name, table_name))
            columns = []
            for row in cursor.fetchall():
                col_dict = dict(row)
                # Format type with length if applicable
                if col_dict["character_maximum_length"]:
                    col_dict["type"] = (
                        f"{col_dict['type']}({col_dict['character_maximum_length']})"
                    )
                # Use udt_name if it's more specific (e.g., varchar, int4)
                if (
                    col_dict["udt_name"]
                    and col_dict["udt_name"] != col_dict["type"]
                ):
                    col_dict["type"] = col_dict["udt_name"]
                # Remove helper fields
                col_dict.pop("character_maximum_length", None)
                col_dict.pop("udt_name", None)
                columns.append(col_dict)

            # If we got results and information_schema is reliable, return them
            if columns and self._check_information_schema_reliable(schema_name):
                return columns
            # Otherwise fall back to pg_catalog
            logger.debug(
                f"information_schema returned no results or is unreliable, "
                f"falling back to pg_catalog for table '{schema_name}.{table_name}'"
            )
    except Exception as e:
        logger.debug(
            f"information_schema query failed: {e}, falling back to pg_catalog"
        )

    # Fallback to pg_catalog
    return self._get_table_columns_pg_catalog(table_name, schema_name)

get_tables(schema_name=None)

Get all tables in the specified schema.

Tries information_schema first, falls back to pg_catalog if needed.

Parameters:

Name Type Description Default
schema_name str | None

Schema name to query. If None, uses 'public' or config schema_name.

None

Returns:

Type Description
list[dict[str, Any]]

List of table information dictionaries with keys: table_name, table_schema

Source code in graflo/db/postgres/conn.py
def get_tables(self, schema_name: str | None = None) -> list[dict[str, Any]]:
    """Get all tables in the specified schema.

    Tries information_schema first, falls back to pg_catalog if needed.

    Args:
        schema_name: Schema name to query. If None, uses 'public' or config schema_name.

    Returns:
        List of table information dictionaries with keys: table_name, table_schema
    """
    if schema_name is None:
        schema_name = self.config.schema_name or "public"

    # Try information_schema first
    try:
        query = """
            SELECT table_name, table_schema
            FROM information_schema.tables
            WHERE table_schema = %s
              AND table_type = 'BASE TABLE'
            ORDER BY table_name;
        """

        with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query, (schema_name,))
            results = [dict(row) for row in cursor.fetchall()]
            # If we got results, check if information_schema is reliable
            if results and self._check_information_schema_reliable(schema_name):
                return results
            # If no results or unreliable, fall back to pg_catalog
            logger.debug(
                f"information_schema returned no results or is unreliable, "
                f"falling back to pg_catalog for schema '{schema_name}'"
            )
    except Exception as e:
        logger.debug(
            f"information_schema query failed: {e}, falling back to pg_catalog"
        )

    # Fallback to pg_catalog
    return self._get_tables_pg_catalog(schema_name)

introspect_schema(schema_name=None)

Introspect the database schema and return structured information.

This is the main method that analyzes the schema and returns information about vertex-like and edge-like tables.

Parameters:

Name Type Description Default
schema_name str | None

Schema name. If None, uses 'public' or config schema_name.

None

Returns:

Type Description
SchemaIntrospectionResult

SchemaIntrospectionResult with vertex_tables, edge_tables, and schema_name

Source code in graflo/db/postgres/conn.py
def introspect_schema(
    self, schema_name: str | None = None
) -> SchemaIntrospectionResult:
    """Introspect the database schema and return structured information.

    This is the main method that analyzes the schema and returns information
    about vertex-like and edge-like tables.

    Args:
        schema_name: Schema name. If None, uses 'public' or config schema_name.

    Returns:
        SchemaIntrospectionResult with vertex_tables, edge_tables, and schema_name
    """
    if schema_name is None:
        schema_name = self.config.schema_name or "public"

    logger.info(f"Introspecting PostgreSQL schema '{schema_name}'")

    vertex_tables = self.detect_vertex_tables(schema_name)
    edge_tables = self.detect_edge_tables(schema_name)

    result = SchemaIntrospectionResult(
        vertex_tables=vertex_tables,
        edge_tables=edge_tables,
        schema_name=schema_name,
    )

    logger.info(
        f"Found {len(vertex_tables)} vertex-like tables and {len(edge_tables)} edge-like tables"
    )

    return result

read(query, params=None)

Execute a SELECT query and return results as a list of dictionaries.

Parameters:

Name Type Description Default
query str

SQL SELECT query to execute

required
params tuple | None

Optional tuple of parameters for parameterized queries

None

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries, where each dictionary represents a row with column names as keys.

list[dict[str, Any]]

Decimal values are converted to float for compatibility with graph databases.

Source code in graflo/db/postgres/conn.py
def read(self, query: str, params: tuple | None = None) -> list[dict[str, Any]]:
    """Execute a SELECT query and return results as a list of dictionaries.

    Args:
        query: SQL SELECT query to execute
        params: Optional tuple of parameters for parameterized queries

    Returns:
        List of dictionaries, where each dictionary represents a row with column names as keys.
        Decimal values are converted to float for compatibility with graph databases.
    """
    from decimal import Decimal

    with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)

        # Convert rows to dictionaries and convert Decimal to float
        results = []
        for row in cursor.fetchall():
            row_dict = dict(row)
            # Convert Decimal to float for JSON/graph database compatibility
            for key, value in row_dict.items():
                if isinstance(value, Decimal):
                    row_dict[key] = float(value)
            results.append(row_dict)

        return results

PostgresResourceMapper

Maps PostgreSQL tables to graflo Resources.

This class creates Resource objects that map PostgreSQL tables to graph vertices and edges, enabling ingestion of relational data into graph databases.

Source code in graflo/db/postgres/resource_mapping.py
class PostgresResourceMapper:
    """Maps PostgreSQL tables to graflo Resources.

    This class creates Resource objects that map PostgreSQL tables to graph vertices
    and edges, enabling ingestion of relational data into graph databases.
    """

    def create_vertex_resource(self, table_name: str, vertex_name: str) -> Resource:
        """Create a Resource for a vertex table.

        Args:
            table_name: Name of the PostgreSQL table
            vertex_name: Name of the vertex type (typically same as table_name)

        Returns:
            Resource: Resource configured to ingest vertex data
        """
        # Create apply list with VertexActor
        # The actor wrapper will interpret {"vertex": vertex_name} as VertexActor
        apply = [{"vertex": vertex_name}]

        resource = Resource(
            resource_name=table_name,
            apply=apply,
        )

        logger.debug(
            f"Created vertex resource '{table_name}' for vertex '{vertex_name}'"
        )

        return resource

    def create_edge_resource(
        self,
        edge_table_info: EdgeTableInfo,
        vertex_config: VertexConfig,
        match_cache: FuzzyMatchCache | None = None,
    ) -> Resource:
        """Create a Resource for an edge table.

        Args:
            edge_table_info: Edge table information from introspection
            vertex_config: Vertex configuration for source/target validation
            match_cache: Optional fuzzy match cache for better performance

        Returns:
            Resource: Resource configured to ingest edge data
        """
        table_name = edge_table_info.name
        source_table = edge_table_info.source_table
        target_table = edge_table_info.target_table
        source_column = edge_table_info.source_column
        target_column = edge_table_info.target_column
        relation = edge_table_info.relation

        # Verify source and target vertices exist
        if source_table not in vertex_config.vertex_set:
            raise ValueError(
                f"Source vertex '{source_table}' for edge table '{table_name}' "
                f"not found in vertex config"
            )

        if target_table not in vertex_config.vertex_set:
            raise ValueError(
                f"Target vertex '{target_table}' for edge table '{table_name}' "
                f"not found in vertex config"
            )

        # Get primary key fields for source and target vertices
        source_vertex_obj = vertex_config._vertices_map[source_table]
        target_vertex_obj = vertex_config._vertices_map[target_table]

        # Get the primary key field(s) from the first index (primary key)
        source_pk_fields = (
            source_vertex_obj.indexes[0].fields if source_vertex_obj.indexes else []
        )
        target_pk_fields = (
            target_vertex_obj.indexes[0].fields if target_vertex_obj.indexes else []
        )

        # Use heuristics to infer PK field names from column names
        # This handles cases like "bla_user" -> "user" vertex -> use "id" or matched field
        vertex_names = list(vertex_config.vertex_set)
        source_pk_field = self._infer_pk_field_from_column(
            source_column, source_table, source_pk_fields, vertex_names, match_cache
        )
        target_pk_field = self._infer_pk_field_from_column(
            target_column, target_table, target_pk_fields, vertex_names, match_cache
        )

        # Create apply list using source_vertex and target_vertex pattern
        # This pattern explicitly specifies which vertex type each mapping targets,
        # avoiding attribute collisions between different vertex types
        apply = []

        # First mapping: map source foreign key column to source vertex's primary key field
        if source_column:
            source_map_config = {
                "target_vertex": source_table,
                "map": {source_column: source_pk_field},
            }
            apply.append(source_map_config)

        # Second mapping: map target foreign key column to target vertex's primary key field
        if target_column:
            target_map_config = {
                "target_vertex": target_table,
                "map": {target_column: target_pk_field},
            }
            apply.append(target_map_config)

        resource = Resource(
            resource_name=table_name,
            apply=apply,
        )

        relation_info = f" with relation '{relation}'" if relation else ""
        logger.debug(
            f"Created edge resource '{table_name}' from {source_table} to {target_table}"
            f"{relation_info} "
            f"(source_col: {source_column} -> {source_pk_field}, "
            f"target_col: {target_column} -> {target_pk_field})"
        )

        return resource

    @staticmethod
    def _infer_pk_field_from_column(
        column_name: str,
        vertex_name: str,
        pk_fields: list[str],
        vertex_names: list[str],
        match_cache: FuzzyMatchCache | None = None,
    ) -> str:
        """Infer primary key field name from column name using heuristics.

        Uses fuzzy matching to identify vertex name fragments in column names,
        then matches to the appropriate PK field. Handles cases like:
        - "user_id" -> "user" vertex -> use first PK field (e.g., "id")
        - "bla_user" -> "user" vertex -> use first PK field
        - "user_id_2" -> "user" vertex -> use first PK field
        - "source_user_id" -> "user" vertex -> use first PK field
        - "bla_user" and "bla_user_2" -> both map to "user" vertex PK field

        The heuristic works by:
        1. Splitting the column name into fragments
        2. Fuzzy matching fragments to vertex names
        3. If a fragment matches the target vertex_name, use the vertex's PK field
        4. Otherwise, fall back to first PK field or "id"

        Args:
            column_name: Name of the column (e.g., "user_id", "bla_user", "bla_user_2")
            vertex_name: Name of the target vertex (already known from edge table info)
            pk_fields: List of primary key field names for the vertex
            vertex_names: List of all vertex names for fuzzy matching
            match_cache: Optional fuzzy match cache for better performance

        Returns:
            Primary key field name (defaults to first PK field or "id" if no match)
        """
        # Split column name into fragments
        separator = detect_separator(column_name)
        fragments = split_by_separator(column_name, separator)

        # Try to find a fragment that matches the target vertex name
        # This confirms that the column is indeed related to this vertex
        for fragment in fragments:
            # Fuzzy match fragment to vertex names
            if match_cache:
                matched_vertex = match_cache.get_match(fragment)
            else:
                # Fallback: create temporary matcher if cache not provided
                from .fuzzy_matcher import FuzzyMatcher

                matcher = FuzzyMatcher(vertex_names)
                matched_vertex, _ = matcher.match(fragment)

            # If we found a match to our target vertex, use its PK field
            if matched_vertex == vertex_name:
                if pk_fields:
                    # Use the first PK field (most common case is single-column PK)
                    return pk_fields[0]
                else:
                    # No PK fields available, use "id" as default
                    return "id"

        # No fragment matched the target vertex, but we still have vertex_name
        # This might happen if the column name doesn't contain the vertex name fragment
        # In this case, trust that vertex_name is correct and use its PK field
        if pk_fields:
            return pk_fields[0]

        # Last resort: use "id" as default
        # This is better than failing, but ideally pk_fields should always be available
        logger.debug(
            f"No PK fields found for vertex '{vertex_name}', using 'id' as default "
            f"for column '{column_name}'"
        )
        return "id"

    def map_tables_to_resources(
        self,
        introspection_result: SchemaIntrospectionResult,
        vertex_config: VertexConfig,
        edge_config: EdgeConfig,
    ) -> list[Resource]:
        """Map all PostgreSQL tables to Resources.

        Creates Resources for both vertex and edge tables, enabling ingestion
        of the entire database schema.

        Args:
            introspection_result: Result from PostgresConnection.introspect_schema()
            vertex_config: Inferred vertex configuration
            edge_config: Inferred edge configuration

        Returns:
            list[Resource]: List of Resources for all tables
        """
        resources = []

        # Create fuzzy match cache once for all edge tables (significant performance improvement)
        vertex_names = list(vertex_config.vertex_set)
        match_cache = FuzzyMatchCache(vertex_names)

        # Map vertex tables to resources
        vertex_tables = introspection_result.vertex_tables
        for table_info in vertex_tables:
            table_name = table_info.name
            vertex_name = table_name  # Use table name as vertex name
            resource = self.create_vertex_resource(table_name, vertex_name)
            resources.append(resource)

        # Map edge tables to resources
        edge_tables = introspection_result.edge_tables
        for edge_table_info in edge_tables:
            try:
                resource = self.create_edge_resource(
                    edge_table_info, vertex_config, match_cache
                )
                resources.append(resource)
            except ValueError as e:
                logger.warning(f"Skipping edge resource creation: {e}")
                continue

        logger.info(
            f"Mapped {len(vertex_tables)} vertex tables and {len(edge_tables)} edge tables "
            f"to {len(resources)} resources"
        )

        return resources

create_edge_resource(edge_table_info, vertex_config, match_cache=None)

Create a Resource for an edge table.

Parameters:

Name Type Description Default
edge_table_info EdgeTableInfo

Edge table information from introspection

required
vertex_config VertexConfig

Vertex configuration for source/target validation

required
match_cache FuzzyMatchCache | None

Optional fuzzy match cache for better performance

None

Returns:

Name Type Description
Resource Resource

Resource configured to ingest edge data

Source code in graflo/db/postgres/resource_mapping.py
def create_edge_resource(
    self,
    edge_table_info: EdgeTableInfo,
    vertex_config: VertexConfig,
    match_cache: FuzzyMatchCache | None = None,
) -> Resource:
    """Create a Resource for an edge table.

    Args:
        edge_table_info: Edge table information from introspection
        vertex_config: Vertex configuration for source/target validation
        match_cache: Optional fuzzy match cache for better performance

    Returns:
        Resource: Resource configured to ingest edge data
    """
    table_name = edge_table_info.name
    source_table = edge_table_info.source_table
    target_table = edge_table_info.target_table
    source_column = edge_table_info.source_column
    target_column = edge_table_info.target_column
    relation = edge_table_info.relation

    # Verify source and target vertices exist
    if source_table not in vertex_config.vertex_set:
        raise ValueError(
            f"Source vertex '{source_table}' for edge table '{table_name}' "
            f"not found in vertex config"
        )

    if target_table not in vertex_config.vertex_set:
        raise ValueError(
            f"Target vertex '{target_table}' for edge table '{table_name}' "
            f"not found in vertex config"
        )

    # Get primary key fields for source and target vertices
    source_vertex_obj = vertex_config._vertices_map[source_table]
    target_vertex_obj = vertex_config._vertices_map[target_table]

    # Get the primary key field(s) from the first index (primary key)
    source_pk_fields = (
        source_vertex_obj.indexes[0].fields if source_vertex_obj.indexes else []
    )
    target_pk_fields = (
        target_vertex_obj.indexes[0].fields if target_vertex_obj.indexes else []
    )

    # Use heuristics to infer PK field names from column names
    # This handles cases like "bla_user" -> "user" vertex -> use "id" or matched field
    vertex_names = list(vertex_config.vertex_set)
    source_pk_field = self._infer_pk_field_from_column(
        source_column, source_table, source_pk_fields, vertex_names, match_cache
    )
    target_pk_field = self._infer_pk_field_from_column(
        target_column, target_table, target_pk_fields, vertex_names, match_cache
    )

    # Create apply list using source_vertex and target_vertex pattern
    # This pattern explicitly specifies which vertex type each mapping targets,
    # avoiding attribute collisions between different vertex types
    apply = []

    # First mapping: map source foreign key column to source vertex's primary key field
    if source_column:
        source_map_config = {
            "target_vertex": source_table,
            "map": {source_column: source_pk_field},
        }
        apply.append(source_map_config)

    # Second mapping: map target foreign key column to target vertex's primary key field
    if target_column:
        target_map_config = {
            "target_vertex": target_table,
            "map": {target_column: target_pk_field},
        }
        apply.append(target_map_config)

    resource = Resource(
        resource_name=table_name,
        apply=apply,
    )

    relation_info = f" with relation '{relation}'" if relation else ""
    logger.debug(
        f"Created edge resource '{table_name}' from {source_table} to {target_table}"
        f"{relation_info} "
        f"(source_col: {source_column} -> {source_pk_field}, "
        f"target_col: {target_column} -> {target_pk_field})"
    )

    return resource

create_vertex_resource(table_name, vertex_name)

Create a Resource for a vertex table.

Parameters:

Name Type Description Default
table_name str

Name of the PostgreSQL table

required
vertex_name str

Name of the vertex type (typically same as table_name)

required

Returns:

Name Type Description
Resource Resource

Resource configured to ingest vertex data

Source code in graflo/db/postgres/resource_mapping.py
def create_vertex_resource(self, table_name: str, vertex_name: str) -> Resource:
    """Create a Resource for a vertex table.

    Args:
        table_name: Name of the PostgreSQL table
        vertex_name: Name of the vertex type (typically same as table_name)

    Returns:
        Resource: Resource configured to ingest vertex data
    """
    # Create apply list with VertexActor
    # The actor wrapper will interpret {"vertex": vertex_name} as VertexActor
    apply = [{"vertex": vertex_name}]

    resource = Resource(
        resource_name=table_name,
        apply=apply,
    )

    logger.debug(
        f"Created vertex resource '{table_name}' for vertex '{vertex_name}'"
    )

    return resource

map_tables_to_resources(introspection_result, vertex_config, edge_config)

Map all PostgreSQL tables to Resources.

Creates Resources for both vertex and edge tables, enabling ingestion of the entire database schema.

Parameters:

Name Type Description Default
introspection_result SchemaIntrospectionResult

Result from PostgresConnection.introspect_schema()

required
vertex_config VertexConfig

Inferred vertex configuration

required
edge_config EdgeConfig

Inferred edge configuration

required

Returns:

Type Description
list[Resource]

list[Resource]: List of Resources for all tables

Source code in graflo/db/postgres/resource_mapping.py
def map_tables_to_resources(
    self,
    introspection_result: SchemaIntrospectionResult,
    vertex_config: VertexConfig,
    edge_config: EdgeConfig,
) -> list[Resource]:
    """Map all PostgreSQL tables to Resources.

    Creates Resources for both vertex and edge tables, enabling ingestion
    of the entire database schema.

    Args:
        introspection_result: Result from PostgresConnection.introspect_schema()
        vertex_config: Inferred vertex configuration
        edge_config: Inferred edge configuration

    Returns:
        list[Resource]: List of Resources for all tables
    """
    resources = []

    # Create fuzzy match cache once for all edge tables (significant performance improvement)
    vertex_names = list(vertex_config.vertex_set)
    match_cache = FuzzyMatchCache(vertex_names)

    # Map vertex tables to resources
    vertex_tables = introspection_result.vertex_tables
    for table_info in vertex_tables:
        table_name = table_info.name
        vertex_name = table_name  # Use table name as vertex name
        resource = self.create_vertex_resource(table_name, vertex_name)
        resources.append(resource)

    # Map edge tables to resources
    edge_tables = introspection_result.edge_tables
    for edge_table_info in edge_tables:
        try:
            resource = self.create_edge_resource(
                edge_table_info, vertex_config, match_cache
            )
            resources.append(resource)
        except ValueError as e:
            logger.warning(f"Skipping edge resource creation: {e}")
            continue

    logger.info(
        f"Mapped {len(vertex_tables)} vertex tables and {len(edge_tables)} edge tables "
        f"to {len(resources)} resources"
    )

    return resources

PostgresSchemaInferencer

Infers graflo Schema from PostgreSQL schema introspection results.

This class takes the output from PostgresConnection.introspect_schema() and generates a complete graflo Schema with vertices, edges, and weights.

Source code in graflo/db/postgres/schema_inference.py
class PostgresSchemaInferencer:
    """Infers graflo Schema from PostgreSQL schema introspection results.

    This class takes the output from PostgresConnection.introspect_schema() and
    generates a complete graflo Schema with vertices, edges, and weights.
    """

    def __init__(
        self,
        db_flavor: DBFlavor = DBFlavor.ARANGO,
        conn: PostgresConnection | None = None,
    ):
        """Initialize the schema inferencer.

        Args:
            db_flavor: Target database flavor for the inferred schema
            conn: PostgreSQL connection for sampling data to infer types (optional)
        """
        self.db_flavor = db_flavor
        self.type_mapper = PostgresTypeMapper()
        self.conn = conn

    def infer_vertex_config(
        self, introspection_result: SchemaIntrospectionResult
    ) -> VertexConfig:
        """Infer VertexConfig from vertex tables.

        Args:
            introspection_result: Result from PostgresConnection.introspect_schema()

        Returns:
            VertexConfig: Inferred vertex configuration
        """
        vertex_tables = introspection_result.vertex_tables
        vertices = []

        for table_info in vertex_tables:
            table_name = table_info.name
            columns = table_info.columns
            pk_columns = table_info.primary_key

            # Create fields from columns
            fields = []
            for col in columns:
                field_name = col.name
                field_type = self.type_mapper.map_type(col.type)
                fields.append(Field(name=field_name, type=field_type))

            # Create indexes from primary key
            indexes = []
            if pk_columns:
                indexes.append(
                    Index(fields=pk_columns, type=IndexType.PERSISTENT, unique=True)
                )

            # Create vertex
            vertex = Vertex(
                name=table_name,
                dbname=table_name,
                fields=fields,
                indexes=indexes,
            )

            vertices.append(vertex)
            logger.debug(
                f"Inferred vertex '{table_name}' with {len(fields)} fields and "
                f"{len(indexes)} indexes"
            )

        return VertexConfig(vertices=vertices, db_flavor=self.db_flavor)

    def _infer_type_from_samples(
        self, table_name: str, schema_name: str, column_name: str, pg_type: str
    ) -> str:
        """Infer field type by sampling 5 rows from the table.

        Uses heuristics to determine if a column contains integers, floats, datetimes, etc.
        Falls back to PostgreSQL type mapping if sampling fails or is unavailable.

        Args:
            table_name: Name of the table
            schema_name: Schema name
            column_name: Name of the column to sample
            pg_type: PostgreSQL type from schema introspection

        Returns:
            str: FieldType value (INT, FLOAT, DATETIME, STRING, etc.)
        """
        # First try PostgreSQL type mapping
        mapped_type = self.type_mapper.map_type(pg_type)

        # If we have a connection, sample data to refine the type
        if self.conn is None:
            logger.debug(
                f"No connection available for sampling, using mapped type '{mapped_type}' "
                f"for column '{column_name}' in table '{table_name}'"
            )
            return mapped_type

        try:
            # Sample 5 rows from the table
            query = (
                f'SELECT "{column_name}" FROM "{schema_name}"."{table_name}" LIMIT 5'
            )
            samples = self.conn.read(query)

            if not samples:
                logger.debug(
                    f"No samples found for column '{column_name}' in table '{table_name}', "
                    f"using mapped type '{mapped_type}'"
                )
                return mapped_type

            # Extract non-None values
            values = [
                row[column_name] for row in samples if row[column_name] is not None
            ]

            if not values:
                logger.debug(
                    f"All samples are NULL for column '{column_name}' in table '{table_name}', "
                    f"using mapped type '{mapped_type}'"
                )
                return mapped_type

            # Heuristics to infer type from values
            # Check for integers (all values are integers)
            if all(isinstance(v, int) for v in values):
                logger.debug(
                    f"Inferred INT type for column '{column_name}' in table '{table_name}' "
                    f"from samples"
                )
                return FieldType.INT.value

            # Check for floats (all values are floats or ints that could be floats)
            if all(isinstance(v, (int, float)) for v in values):
                # If any value has decimal part, it's a float
                if any(isinstance(v, float) and v != float(int(v)) for v in values):
                    logger.debug(
                        f"Inferred FLOAT type for column '{column_name}' in table '{table_name}' "
                        f"from samples"
                    )
                    return FieldType.FLOAT.value
                # All integers, but might be stored as float - check PostgreSQL type
                if mapped_type == FieldType.FLOAT.value:
                    return FieldType.FLOAT.value
                return FieldType.INT.value

            # Check for datetime/date objects
            from datetime import date, datetime, time

            if all(isinstance(v, (datetime, date, time)) for v in values):
                logger.debug(
                    f"Inferred DATETIME type for column '{column_name}' in table '{table_name}' "
                    f"from samples"
                )
                return FieldType.DATETIME.value

            # Check for ISO format datetime strings
            if all(isinstance(v, str) for v in values):
                # Try to parse as ISO datetime
                iso_datetime_count = 0
                for v in values:
                    try:
                        # Try ISO format (with or without timezone)
                        datetime.fromisoformat(v.replace("Z", "+00:00"))
                        iso_datetime_count += 1
                    except (ValueError, AttributeError):
                        # Try other common formats
                        try:
                            datetime.strptime(v, "%Y-%m-%d %H:%M:%S")
                            iso_datetime_count += 1
                        except ValueError:
                            try:
                                datetime.strptime(v, "%Y-%m-%d")
                                iso_datetime_count += 1
                            except ValueError:
                                pass

                # If most values look like datetimes, infer DATETIME
                if iso_datetime_count >= len(values) * 0.8:  # 80% threshold
                    logger.debug(
                        f"Inferred DATETIME type for column '{column_name}' in table '{table_name}' "
                        f"from ISO format strings"
                    )
                    return FieldType.DATETIME.value

            # Default to mapped type
            logger.debug(
                f"Using mapped type '{mapped_type}' for column '{column_name}' in table '{table_name}' "
                f"(could not infer from samples)"
            )
            return mapped_type

        except Exception as e:
            logger.warning(
                f"Error sampling data for column '{column_name}' in table '{table_name}': {e}. "
                f"Using mapped type '{mapped_type}'"
            )
            return mapped_type

    def infer_edge_weights(self, edge_table_info: EdgeTableInfo) -> WeightConfig | None:
        """Infer edge weights from edge table columns with types.

        Uses PostgreSQL column types and optionally samples data to infer accurate types.

        Args:
            edge_table_info: Edge table information from introspection

        Returns:
            WeightConfig if there are weight columns, None otherwise
        """
        columns = edge_table_info.columns
        pk_columns = set(edge_table_info.primary_key)
        fk_columns = {fk.column for fk in edge_table_info.foreign_keys}

        # Find non-PK, non-FK columns (these become weights)
        weight_columns = [
            col
            for col in columns
            if col.name not in pk_columns and col.name not in fk_columns
        ]

        if not weight_columns:
            return None

        # Create Field objects with types for each weight column
        direct_weights = []
        for col in weight_columns:
            # Infer type: use PostgreSQL type first, then sample if needed
            field_type = self._infer_type_from_samples(
                edge_table_info.name,
                edge_table_info.schema_name,
                col.name,
                col.type,
            )
            direct_weights.append(Field(name=col.name, type=field_type))

        logger.debug(
            f"Inferred {len(direct_weights)} weights for edge table "
            f"'{edge_table_info.name}': {[f.name for f in direct_weights]}"
        )

        return WeightConfig(direct=direct_weights)

    def infer_edge_config(
        self,
        introspection_result: SchemaIntrospectionResult,
        vertex_config: VertexConfig,
    ) -> EdgeConfig:
        """Infer EdgeConfig from edge tables.

        Args:
            introspection_result: Result from PostgresConnection.introspect_schema()
            vertex_config: Inferred vertex configuration

        Returns:
            EdgeConfig: Inferred edge configuration
        """
        edge_tables = introspection_result.edge_tables
        edges = []

        vertex_names = vertex_config.vertex_set

        for edge_table_info in edge_tables:
            table_name = edge_table_info.name
            source_table = edge_table_info.source_table
            target_table = edge_table_info.target_table

            # Verify source and target vertices exist
            if source_table not in vertex_names:
                logger.warning(
                    f"Source vertex '{source_table}' for edge table '{table_name}' "
                    f"not found in vertex config, skipping"
                )
                continue

            if target_table not in vertex_names:
                logger.warning(
                    f"Target vertex '{target_table}' for edge table '{table_name}' "
                    f"not found in vertex config, skipping"
                )
                continue

            # Infer weights
            weights = self.infer_edge_weights(edge_table_info)
            indexes = []
            # Create edge
            edge = Edge(
                source=source_table,
                target=target_table,
                indexes=indexes,
                weights=weights,
                relation=edge_table_info.relation,
            )

            edges.append(edge)
            logger.debug(
                f"Inferred edge '{table_name}' from {source_table} to {target_table}"
            )

        return EdgeConfig(edges=edges)

    def infer_schema(
        self,
        introspection_result: SchemaIntrospectionResult,
        schema_name: str | None = None,
    ) -> Schema:
        """Infer complete Schema from PostgreSQL introspection.

        Args:
            introspection_result: Result from PostgresConnection.introspect_schema()
            schema_name: Schema name (defaults to schema_name from introspection if None)

        Returns:
            Schema: Complete inferred schema with vertices, edges, and metadata
        """
        if schema_name is None:
            schema_name = introspection_result.schema_name

        logger.info(f"Inferring schema from PostgreSQL schema '{schema_name}'")

        # Infer vertex configuration
        vertex_config = self.infer_vertex_config(introspection_result)
        logger.info(f"Inferred {len(vertex_config.vertices)} vertices")

        # Infer edge configuration
        edge_config = self.infer_edge_config(introspection_result, vertex_config)
        edges_count = len(list(edge_config.edges_list()))
        logger.info(f"Inferred {edges_count} edges")

        # Create schema metadata
        metadata = SchemaMetadata(name=schema_name)

        # Create schema (resources will be added separately)
        schema = Schema(
            general=metadata,
            vertex_config=vertex_config,
            edge_config=edge_config,
            resources=[],  # Resources will be created separately
        )

        logger.info(
            f"Successfully inferred schema '{schema_name}' with "
            f"{len(vertex_config.vertices)} vertices and "
            f"{len(list(edge_config.edges_list()))} edges"
        )

        return schema

__init__(db_flavor=DBFlavor.ARANGO, conn=None)

Initialize the schema inferencer.

Parameters:

Name Type Description Default
db_flavor DBFlavor

Target database flavor for the inferred schema

ARANGO
conn PostgresConnection | None

PostgreSQL connection for sampling data to infer types (optional)

None
Source code in graflo/db/postgres/schema_inference.py
def __init__(
    self,
    db_flavor: DBFlavor = DBFlavor.ARANGO,
    conn: PostgresConnection | None = None,
):
    """Initialize the schema inferencer.

    Args:
        db_flavor: Target database flavor for the inferred schema
        conn: PostgreSQL connection for sampling data to infer types (optional)
    """
    self.db_flavor = db_flavor
    self.type_mapper = PostgresTypeMapper()
    self.conn = conn

infer_edge_config(introspection_result, vertex_config)

Infer EdgeConfig from edge tables.

Parameters:

Name Type Description Default
introspection_result SchemaIntrospectionResult

Result from PostgresConnection.introspect_schema()

required
vertex_config VertexConfig

Inferred vertex configuration

required

Returns:

Name Type Description
EdgeConfig EdgeConfig

Inferred edge configuration

Source code in graflo/db/postgres/schema_inference.py
def infer_edge_config(
    self,
    introspection_result: SchemaIntrospectionResult,
    vertex_config: VertexConfig,
) -> EdgeConfig:
    """Infer EdgeConfig from edge tables.

    Args:
        introspection_result: Result from PostgresConnection.introspect_schema()
        vertex_config: Inferred vertex configuration

    Returns:
        EdgeConfig: Inferred edge configuration
    """
    edge_tables = introspection_result.edge_tables
    edges = []

    vertex_names = vertex_config.vertex_set

    for edge_table_info in edge_tables:
        table_name = edge_table_info.name
        source_table = edge_table_info.source_table
        target_table = edge_table_info.target_table

        # Verify source and target vertices exist
        if source_table not in vertex_names:
            logger.warning(
                f"Source vertex '{source_table}' for edge table '{table_name}' "
                f"not found in vertex config, skipping"
            )
            continue

        if target_table not in vertex_names:
            logger.warning(
                f"Target vertex '{target_table}' for edge table '{table_name}' "
                f"not found in vertex config, skipping"
            )
            continue

        # Infer weights
        weights = self.infer_edge_weights(edge_table_info)
        indexes = []
        # Create edge
        edge = Edge(
            source=source_table,
            target=target_table,
            indexes=indexes,
            weights=weights,
            relation=edge_table_info.relation,
        )

        edges.append(edge)
        logger.debug(
            f"Inferred edge '{table_name}' from {source_table} to {target_table}"
        )

    return EdgeConfig(edges=edges)

infer_edge_weights(edge_table_info)

Infer edge weights from edge table columns with types.

Uses PostgreSQL column types and optionally samples data to infer accurate types.

Parameters:

Name Type Description Default
edge_table_info EdgeTableInfo

Edge table information from introspection

required

Returns:

Type Description
WeightConfig | None

WeightConfig if there are weight columns, None otherwise

Source code in graflo/db/postgres/schema_inference.py
def infer_edge_weights(self, edge_table_info: EdgeTableInfo) -> WeightConfig | None:
    """Infer edge weights from edge table columns with types.

    Uses PostgreSQL column types and optionally samples data to infer accurate types.

    Args:
        edge_table_info: Edge table information from introspection

    Returns:
        WeightConfig if there are weight columns, None otherwise
    """
    columns = edge_table_info.columns
    pk_columns = set(edge_table_info.primary_key)
    fk_columns = {fk.column for fk in edge_table_info.foreign_keys}

    # Find non-PK, non-FK columns (these become weights)
    weight_columns = [
        col
        for col in columns
        if col.name not in pk_columns and col.name not in fk_columns
    ]

    if not weight_columns:
        return None

    # Create Field objects with types for each weight column
    direct_weights = []
    for col in weight_columns:
        # Infer type: use PostgreSQL type first, then sample if needed
        field_type = self._infer_type_from_samples(
            edge_table_info.name,
            edge_table_info.schema_name,
            col.name,
            col.type,
        )
        direct_weights.append(Field(name=col.name, type=field_type))

    logger.debug(
        f"Inferred {len(direct_weights)} weights for edge table "
        f"'{edge_table_info.name}': {[f.name for f in direct_weights]}"
    )

    return WeightConfig(direct=direct_weights)

infer_schema(introspection_result, schema_name=None)

Infer complete Schema from PostgreSQL introspection.

Parameters:

Name Type Description Default
introspection_result SchemaIntrospectionResult

Result from PostgresConnection.introspect_schema()

required
schema_name str | None

Schema name (defaults to schema_name from introspection if None)

None

Returns:

Name Type Description
Schema Schema

Complete inferred schema with vertices, edges, and metadata

Source code in graflo/db/postgres/schema_inference.py
def infer_schema(
    self,
    introspection_result: SchemaIntrospectionResult,
    schema_name: str | None = None,
) -> Schema:
    """Infer complete Schema from PostgreSQL introspection.

    Args:
        introspection_result: Result from PostgresConnection.introspect_schema()
        schema_name: Schema name (defaults to schema_name from introspection if None)

    Returns:
        Schema: Complete inferred schema with vertices, edges, and metadata
    """
    if schema_name is None:
        schema_name = introspection_result.schema_name

    logger.info(f"Inferring schema from PostgreSQL schema '{schema_name}'")

    # Infer vertex configuration
    vertex_config = self.infer_vertex_config(introspection_result)
    logger.info(f"Inferred {len(vertex_config.vertices)} vertices")

    # Infer edge configuration
    edge_config = self.infer_edge_config(introspection_result, vertex_config)
    edges_count = len(list(edge_config.edges_list()))
    logger.info(f"Inferred {edges_count} edges")

    # Create schema metadata
    metadata = SchemaMetadata(name=schema_name)

    # Create schema (resources will be added separately)
    schema = Schema(
        general=metadata,
        vertex_config=vertex_config,
        edge_config=edge_config,
        resources=[],  # Resources will be created separately
    )

    logger.info(
        f"Successfully inferred schema '{schema_name}' with "
        f"{len(vertex_config.vertices)} vertices and "
        f"{len(list(edge_config.edges_list()))} edges"
    )

    return schema

infer_vertex_config(introspection_result)

Infer VertexConfig from vertex tables.

Parameters:

Name Type Description Default
introspection_result SchemaIntrospectionResult

Result from PostgresConnection.introspect_schema()

required

Returns:

Name Type Description
VertexConfig VertexConfig

Inferred vertex configuration

Source code in graflo/db/postgres/schema_inference.py
def infer_vertex_config(
    self, introspection_result: SchemaIntrospectionResult
) -> VertexConfig:
    """Infer VertexConfig from vertex tables.

    Args:
        introspection_result: Result from PostgresConnection.introspect_schema()

    Returns:
        VertexConfig: Inferred vertex configuration
    """
    vertex_tables = introspection_result.vertex_tables
    vertices = []

    for table_info in vertex_tables:
        table_name = table_info.name
        columns = table_info.columns
        pk_columns = table_info.primary_key

        # Create fields from columns
        fields = []
        for col in columns:
            field_name = col.name
            field_type = self.type_mapper.map_type(col.type)
            fields.append(Field(name=field_name, type=field_type))

        # Create indexes from primary key
        indexes = []
        if pk_columns:
            indexes.append(
                Index(fields=pk_columns, type=IndexType.PERSISTENT, unique=True)
            )

        # Create vertex
        vertex = Vertex(
            name=table_name,
            dbname=table_name,
            fields=fields,
            indexes=indexes,
        )

        vertices.append(vertex)
        logger.debug(
            f"Inferred vertex '{table_name}' with {len(fields)} fields and "
            f"{len(indexes)} indexes"
        )

    return VertexConfig(vertices=vertices, db_flavor=self.db_flavor)

create_patterns_from_postgres(conn, schema_name=None)

Create Patterns from PostgreSQL tables.

Parameters:

Name Type Description Default
conn PostgresConnection

PostgresConnection instance

required
schema_name str | None

Schema name to introspect

None

Returns:

Name Type Description
Patterns Patterns

Patterns object with TablePattern instances for all tables

Source code in graflo/db/postgres/heuristics.py
def create_patterns_from_postgres(
    conn: PostgresConnection, schema_name: str | None = None
) -> Patterns:
    """Create Patterns from PostgreSQL tables.

    Args:
        conn: PostgresConnection instance
        schema_name: Schema name to introspect

    Returns:
        Patterns: Patterns object with TablePattern instances for all tables
    """

    # Introspect the schema
    introspection_result = conn.introspect_schema(schema_name=schema_name)

    # Create patterns
    patterns = Patterns()

    # Get schema name
    effective_schema = schema_name or introspection_result.schema_name

    # Store the connection config
    config_key = "default"
    patterns.postgres_configs[(config_key, effective_schema)] = conn.config

    # Add patterns for vertex tables
    for table_info in introspection_result.vertex_tables:
        table_name = table_info.name
        table_pattern = TablePattern(
            table_name=table_name,
            schema_name=effective_schema,
            resource_name=table_name,
        )
        patterns.patterns[table_name] = table_pattern
        patterns.postgres_table_configs[table_name] = (
            config_key,
            effective_schema,
            table_name,
        )

    # Add patterns for edge tables
    for table_info in introspection_result.edge_tables:
        table_name = table_info.name
        table_pattern = TablePattern(
            table_name=table_name,
            schema_name=effective_schema,
            resource_name=table_name,
        )
        patterns.patterns[table_name] = table_pattern
        patterns.postgres_table_configs[table_name] = (
            config_key,
            effective_schema,
            table_name,
        )

    return patterns

create_resources_from_postgres(conn, schema, schema_name=None)

Create Resources from PostgreSQL tables for an existing schema.

Parameters:

Name Type Description Default
conn PostgresConnection

PostgresConnection instance

required
schema

Existing Schema object

required
schema_name str | None

Schema name to introspect

None

Returns:

Type Description

list[Resource]: List of Resources for PostgreSQL tables

Source code in graflo/db/postgres/heuristics.py
def create_resources_from_postgres(
    conn: PostgresConnection, schema, schema_name: str | None = None
):
    """Create Resources from PostgreSQL tables for an existing schema.

    Args:
        conn: PostgresConnection instance
        schema: Existing Schema object
        schema_name: Schema name to introspect

    Returns:
        list[Resource]: List of Resources for PostgreSQL tables
    """
    # Introspect the schema
    introspection_result = conn.introspect_schema(schema_name=schema_name)

    # Map tables to resources
    mapper = PostgresResourceMapper()
    resources = mapper.map_tables_to_resources(
        introspection_result, schema.vertex_config, schema.edge_config
    )

    return resources

infer_schema_from_postgres(conn, schema_name=None, db_flavor=None)

Convenience function to infer a graflo Schema from PostgreSQL database.

Parameters:

Name Type Description Default
conn PostgresConnection

PostgresConnection instance

required
schema_name str | None

Schema name to introspect (defaults to config schema_name or 'public')

None
db_flavor

Target database flavor (defaults to ARANGO)

None

Returns:

Name Type Description
Schema

Inferred schema with vertices, edges, and resources

Source code in graflo/db/postgres/heuristics.py
def infer_schema_from_postgres(
    conn: PostgresConnection, schema_name: str | None = None, db_flavor=None
):
    """Convenience function to infer a graflo Schema from PostgreSQL database.

    Args:
        conn: PostgresConnection instance
        schema_name: Schema name to introspect (defaults to config schema_name or 'public')
        db_flavor: Target database flavor (defaults to ARANGO)

    Returns:
        Schema: Inferred schema with vertices, edges, and resources
    """
    from graflo.onto import DBFlavor

    if db_flavor is None:
        db_flavor = DBFlavor.ARANGO

    # Introspect the schema
    introspection_result = conn.introspect_schema(schema_name=schema_name)

    # Infer schema (pass connection for type sampling)
    inferencer = PostgresSchemaInferencer(db_flavor=db_flavor, conn=conn)
    schema = inferencer.infer_schema(introspection_result, schema_name=schema_name)

    # Create and add resources
    mapper = PostgresResourceMapper()
    resources = mapper.map_tables_to_resources(
        introspection_result, schema.vertex_config, schema.edge_config
    )

    # Update schema with resources
    schema.resources = resources
    # Re-initialize to set up resource mappings
    schema.__post_init__()

    return schema