GitHub user sergem155 added a comment to the discussion: migrate database 
sqlite to postgres

thanks for the inspiration. below works for 6.0.0:

```
import sqlite3
import psycopg2
import uuid
import sys
from collections import defaultdict

# Tables to exclude from migration (managed by alembic or system tables)
EXCLUDED_TABLES = {'alembic_version'}


def get_tables_in_dependency_order(pg_cursor):
        """
        Get all tables from PostgreSQL and return them sorted in dependency 
order.
        Tables with no dependencies come first, tables that depend on others 
come later.
        This order is correct for INSERT operations.
        For DELETE operations, reverse this list.
        """
        # Get all tables in public schema
        pg_cursor.execute("""
                SELECT table_name 
                FROM information_schema.tables 
                WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
        """)
        all_tables = {row[0] for row in pg_cursor.fetchall()}
        
        # Get foreign key dependencies: child_table -> parent_table
        # (child_table has FK pointing to parent_table)
        pg_cursor.execute("""
                SELECT 
                        tc.table_name as child_table,
                        ccu.table_name as parent_table
                FROM information_schema.table_constraints tc
                JOIN information_schema.constraint_column_usage ccu 
                        ON tc.constraint_name = ccu.constraint_name 
                        AND tc.table_schema = ccu.table_schema
                WHERE tc.constraint_type = 'FOREIGN KEY'
                        AND tc.table_schema = 'public'
                        AND tc.table_name != ccu.table_name
        """)
        
        # Build dependency graph: table -> set of tables it depends on (parents)
        dependencies = defaultdict(set)
        for child, parent in pg_cursor.fetchall():
                if child in all_tables and parent in all_tables:
                        dependencies[child].add(parent)
        
        # Topological sort using Kahn's algorithm
        # Calculate in-degree (number of dependencies) for each table
        in_degree = {table: len(dependencies[table]) for table in all_tables}
        
        # Start with tables that have no dependencies
        queue = [table for table in all_tables if in_degree[table] == 0]
        sorted_tables = []
        
        while queue:
                # Sort queue for deterministic ordering
                queue.sort()
                table = queue.pop(0)
                sorted_tables.append(table)
                
                # For each table that depends on this one, reduce its in-degree
                for other_table in all_tables:
                        if table in dependencies[other_table]:
                                in_degree[other_table] -= 1
                                if in_degree[other_table] == 0:
                                        queue.append(other_table)
        
        # Check for circular dependencies
        if len(sorted_tables) != len(all_tables):
                remaining = all_tables - set(sorted_tables)
                print(f"WARNING: Circular dependencies detected involving 
tables: {remaining}")
                # Add remaining tables at the end (may cause FK issues)
                sorted_tables.extend(sorted(remaining))
        
        return sorted_tables


def upload_data_to_postgres(sqlite_path: str, pg_conn_string: str):
        sqlite_conn = sqlite3.connect(sqlite_path)
        pg_conn = psycopg2.connect(pg_conn_string)

        # Step 1: Verify alembic_version matches between SQLite and PostgreSQL
        sqlite_cursor = sqlite_conn.cursor()
        pg_cursor = pg_conn.cursor()
        
        sqlite_cursor.execute("SELECT version_num FROM alembic_version")
        sqlite_version_row = sqlite_cursor.fetchone()
        sqlite_version = sqlite_version_row[0] if sqlite_version_row else None
        
        pg_cursor.execute("SELECT version_num FROM alembic_version")
        pg_version_row = pg_cursor.fetchone()
        pg_version = pg_version_row[0] if pg_version_row else None
        
        print(f"SQLite alembic_version: {sqlite_version}")
        print(f"PostgreSQL alembic_version: {pg_version}")
        
        if sqlite_version != pg_version:
                print(f"\nERROR: alembic_version mismatch!")
                print(f"  SQLite:     {sqlite_version}")
                print(f"  PostgreSQL: {pg_version}")
                print("The database schemas must be at the same migration 
version before migrating data.")
                print("Migration aborted.")
                sqlite_conn.close()
                pg_conn.close()
                sys.exit(1)
        
        print("alembic_version check passed.\n")

        # Step 2: Get tables in dependency order from PostgreSQL
        print("Analyzing table dependencies...")
        all_pg_tables = get_tables_in_dependency_order(pg_cursor)
        tables_to_migrate = [t for t in all_pg_tables if t not in 
EXCLUDED_TABLES]
        print(f"Found {len(tables_to_migrate)} tables to migrate (in dependency 
order):\n")
        for i, t in enumerate(tables_to_migrate, 1):
                print(f"  {i:2}. {t}")
        print()

        # Step 3: Verify all SQLite tables are present in PostgreSQL
        sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE 
type='table' AND name NOT LIKE 'sqlite_%'")
        sqlite_tables = {row[0] for row in sqlite_cursor.fetchall()} - 
EXCLUDED_TABLES
        pg_tables_set = set(tables_to_migrate)
        
        missing_in_pg = sqlite_tables - pg_tables_set
        missing_in_sqlite = pg_tables_set - sqlite_tables
        
        if missing_in_pg:
                print("WARNING: Tables in SQLite but NOT in PostgreSQL (cannot 
migrate):")
                for t in sorted(missing_in_pg):
                        print(f"  - {t}")
                print()
        
        if missing_in_sqlite:
                print("INFO: Tables in PostgreSQL but NOT in SQLite (will be 
emptied):")
                for t in sorted(missing_in_sqlite):
                        print(f"  - {t}")
                print()
        
        # Only migrate tables that exist in both databases
        tables_to_migrate = [t for t in tables_to_migrate if t in sqlite_tables]

        for table_name in reversed(tables_to_migrate):
                print("deleting " + table_name)
                pg_cursor.execute(f"delete from {table_name}")

        for table_name in tables_to_migrate:
                print("fetching " + table_name)
                sqlite_cursor = sqlite_conn.cursor()
                sqlite_cursor.execute(f"SELECT * FROM {table_name}")
                tables = sqlite_cursor.fetchall()
                columns_types = (
                        sqlite_conn.cursor().execute(f"PRAGMA 
table_info({table_name})").fetchall()
                )

                columns = ", ".join([f'"{desc[1]}"' for desc in columns_types])
                values = ", ".join(["%s"] * len(columns_types))
                print("executing " + table_name)

                for row in tables:
                        query = f"INSERT INTO {table_name} ({columns}) VALUES 
({values})"
                        row_casted = []
                        for i, cell in enumerate(row):
                                if columns_types[i][2] == "BOOLEAN":
                                        row_casted.append(bool(cell))
                                elif columns_types[i][2] == "NUMERIC(16)":
                                        
row_casted.append(str(uuid.UUID(bytes=cell)))
                                else:
                                        row_casted.append(cell)
                        pg_cursor.execute(query, row_casted)

                pg_conn.commit()

        # Reset all sequences for columns with auto-increment/serial in 
migrated tables
        # First, get all existing sequences in the database
        pg_cursor.execute("""
                SELECT sequence_name FROM information_schema.sequences 
                WHERE sequence_schema = 'public'
        """)
        all_sequences = {row[0] for row in pg_cursor.fetchall()}
        all_sequences_reset = set()
        
        for table_name in tables_to_migrate:
                print(f"checking sequences for {table_name}")
                
                # Method 1: Find sequences linked via pg_depend (deptype 'a' or 
'i')
                pg_cursor.execute("""
                        SELECT a.attname as column_name, 
                               s.relname as seq_name,
                               n.nspname as seq_schema
                        FROM pg_class t
                        JOIN pg_namespace tn ON t.relnamespace = tn.oid
                        JOIN pg_depend d ON d.refobjid = t.oid
                        JOIN pg_class s ON s.oid = d.objid
                        JOIN pg_namespace n ON s.relnamespace = n.oid
                        LEFT JOIN pg_attribute a ON a.attrelid = t.oid AND 
a.attnum = d.refobjsubid
                        WHERE t.relname = %s
                          AND tn.nspname = 'public'
                          AND s.relkind = 'S'
                          AND d.deptype IN ('a', 'i')
                """, (table_name,))
                columns_info = pg_cursor.fetchall()
                
                for column_name, seq_name, seq_schema in columns_info:
                        if seq_name and column_name:
                                print(f"  resetting sequence {seq_name} for 
{table_name}.{column_name} (via pg_depend)")
                                pg_cursor.execute(f"""
                                        SELECT 
setval('{seq_schema}.{seq_name}', COALESCE((SELECT MAX("{column_name}") FROM 
{table_name}), 0) + 1, false)
                                """)
                                all_sequences_reset.add(seq_name)
                
                # Method 2: Fallback - check for sequences by naming convention 
{table}_id_seq
                # This handles cases where sequences exist but aren't linked 
via pg_depend
                conventional_seq_name = f"{table_name}_id_seq"
                if conventional_seq_name in all_sequences and 
conventional_seq_name not in all_sequences_reset:
                        # Check if table has an 'id' column
                        pg_cursor.execute("""
                                SELECT column_name FROM 
information_schema.columns
                                WHERE table_name = %s AND table_schema = 
'public' AND column_name = 'id'
                        """, (table_name,))
                        if pg_cursor.fetchone():
                                print(f"  resetting sequence 
{conventional_seq_name} for {table_name}.id (via naming convention)")
                                pg_cursor.execute(f"""
                                        SELECT 
setval('public.{conventional_seq_name}', COALESCE((SELECT MAX(id) FROM 
{table_name}), 0) + 1, false)
                                """)
                                all_sequences_reset.add(conventional_seq_name)
                
                pg_conn.commit()

        # Summary of sequences
        sequences_not_reset = all_sequences - all_sequences_reset
        
        print("\n" + "=" * 60)
        print("SEQUENCE SUMMARY")
        print("=" * 60)
        print(f"\nTotal sequences in database: {len(all_sequences)}")
        print(f"Sequences reset: {len(all_sequences_reset)}")
        print(f"Sequences NOT reset: {len(sequences_not_reset)}")
        
        if all_sequences_reset:
                print(f"\nSequences that were reset 
({len(all_sequences_reset)}):")
                for seq in sorted(all_sequences_reset):
                        print(f"  + {seq}")
        
        if sequences_not_reset:
                print(f"\nSequences that were NOT reset 
({len(sequences_not_reset)}):")
                for seq in sorted(sequences_not_reset):
                        print(f"  - {seq}")
                print("\nNote: These sequences may belong to tables not in 
SQLite or have non-standard naming.")
        
        print("\n" + "=" * 60)
        print("Migration complete.")
        print("=" * 60)

        sqlite_conn.close()
        pg_conn.close()

if __name__ == "__main__":
        if len(sys.argv) != 3:
                print("Usage: python main.py <sqlite_path> <pg_conn_string>")
                sys.exit(1)
        sqlite_path = sys.argv[1]
        pg_conn_string = sys.argv[2]
        upload_data_to_postgres(sqlite_path=sqlite_path, 
pg_conn_string=pg_conn_string)

```

GitHub link: 
https://github.com/apache/superset/discussions/17092#discussioncomment-15593920

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to