GitHub user sergem155 added a comment to the discussion: migrate database
sqlite to postgres
thanks for the inspiration. below works for 6.0.0:
```
import sqlite3
import psycopg2
import uuid
import sys
from collections import defaultdict
# Tables to exclude from migration (managed by alembic or system tables)
EXCLUDED_TABLES = {'alembic_version'}
def get_tables_in_dependency_order(pg_cursor):
"""
Get all tables from PostgreSQL and return them sorted in dependency
order.
Tables with no dependencies come first, tables that depend on others
come later.
This order is correct for INSERT operations.
For DELETE operations, reverse this list.
"""
# Get all tables in public schema
pg_cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
""")
all_tables = {row[0] for row in pg_cursor.fetchall()}
# Get foreign key dependencies: child_table -> parent_table
# (child_table has FK pointing to parent_table)
pg_cursor.execute("""
SELECT
tc.table_name as child_table,
ccu.table_name as parent_table
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
AND tc.table_schema = ccu.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = 'public'
AND tc.table_name != ccu.table_name
""")
# Build dependency graph: table -> set of tables it depends on (parents)
dependencies = defaultdict(set)
for child, parent in pg_cursor.fetchall():
if child in all_tables and parent in all_tables:
dependencies[child].add(parent)
# Topological sort using Kahn's algorithm
# Calculate in-degree (number of dependencies) for each table
in_degree = {table: len(dependencies[table]) for table in all_tables}
# Start with tables that have no dependencies
queue = [table for table in all_tables if in_degree[table] == 0]
sorted_tables = []
while queue:
# Sort queue for deterministic ordering
queue.sort()
table = queue.pop(0)
sorted_tables.append(table)
# For each table that depends on this one, reduce its in-degree
for other_table in all_tables:
if table in dependencies[other_table]:
in_degree[other_table] -= 1
if in_degree[other_table] == 0:
queue.append(other_table)
# Check for circular dependencies
if len(sorted_tables) != len(all_tables):
remaining = all_tables - set(sorted_tables)
print(f"WARNING: Circular dependencies detected involving
tables: {remaining}")
# Add remaining tables at the end (may cause FK issues)
sorted_tables.extend(sorted(remaining))
return sorted_tables
def upload_data_to_postgres(sqlite_path: str, pg_conn_string: str):
sqlite_conn = sqlite3.connect(sqlite_path)
pg_conn = psycopg2.connect(pg_conn_string)
# Step 1: Verify alembic_version matches between SQLite and PostgreSQL
sqlite_cursor = sqlite_conn.cursor()
pg_cursor = pg_conn.cursor()
sqlite_cursor.execute("SELECT version_num FROM alembic_version")
sqlite_version_row = sqlite_cursor.fetchone()
sqlite_version = sqlite_version_row[0] if sqlite_version_row else None
pg_cursor.execute("SELECT version_num FROM alembic_version")
pg_version_row = pg_cursor.fetchone()
pg_version = pg_version_row[0] if pg_version_row else None
print(f"SQLite alembic_version: {sqlite_version}")
print(f"PostgreSQL alembic_version: {pg_version}")
if sqlite_version != pg_version:
print(f"\nERROR: alembic_version mismatch!")
print(f" SQLite: {sqlite_version}")
print(f" PostgreSQL: {pg_version}")
print("The database schemas must be at the same migration
version before migrating data.")
print("Migration aborted.")
sqlite_conn.close()
pg_conn.close()
sys.exit(1)
print("alembic_version check passed.\n")
# Step 2: Get tables in dependency order from PostgreSQL
print("Analyzing table dependencies...")
all_pg_tables = get_tables_in_dependency_order(pg_cursor)
tables_to_migrate = [t for t in all_pg_tables if t not in
EXCLUDED_TABLES]
print(f"Found {len(tables_to_migrate)} tables to migrate (in dependency
order):\n")
for i, t in enumerate(tables_to_migrate, 1):
print(f" {i:2}. {t}")
print()
# Step 3: Verify all SQLite tables are present in PostgreSQL
sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE
type='table' AND name NOT LIKE 'sqlite_%'")
sqlite_tables = {row[0] for row in sqlite_cursor.fetchall()} -
EXCLUDED_TABLES
pg_tables_set = set(tables_to_migrate)
missing_in_pg = sqlite_tables - pg_tables_set
missing_in_sqlite = pg_tables_set - sqlite_tables
if missing_in_pg:
print("WARNING: Tables in SQLite but NOT in PostgreSQL (cannot
migrate):")
for t in sorted(missing_in_pg):
print(f" - {t}")
print()
if missing_in_sqlite:
print("INFO: Tables in PostgreSQL but NOT in SQLite (will be
emptied):")
for t in sorted(missing_in_sqlite):
print(f" - {t}")
print()
# Only migrate tables that exist in both databases
tables_to_migrate = [t for t in tables_to_migrate if t in sqlite_tables]
for table_name in reversed(tables_to_migrate):
print("deleting " + table_name)
pg_cursor.execute(f"delete from {table_name}")
for table_name in tables_to_migrate:
print("fetching " + table_name)
sqlite_cursor = sqlite_conn.cursor()
sqlite_cursor.execute(f"SELECT * FROM {table_name}")
tables = sqlite_cursor.fetchall()
columns_types = (
sqlite_conn.cursor().execute(f"PRAGMA
table_info({table_name})").fetchall()
)
columns = ", ".join([f'"{desc[1]}"' for desc in columns_types])
values = ", ".join(["%s"] * len(columns_types))
print("executing " + table_name)
for row in tables:
query = f"INSERT INTO {table_name} ({columns}) VALUES
({values})"
row_casted = []
for i, cell in enumerate(row):
if columns_types[i][2] == "BOOLEAN":
row_casted.append(bool(cell))
elif columns_types[i][2] == "NUMERIC(16)":
row_casted.append(str(uuid.UUID(bytes=cell)))
else:
row_casted.append(cell)
pg_cursor.execute(query, row_casted)
pg_conn.commit()
# Reset all sequences for columns with auto-increment/serial in
migrated tables
# First, get all existing sequences in the database
pg_cursor.execute("""
SELECT sequence_name FROM information_schema.sequences
WHERE sequence_schema = 'public'
""")
all_sequences = {row[0] for row in pg_cursor.fetchall()}
all_sequences_reset = set()
for table_name in tables_to_migrate:
print(f"checking sequences for {table_name}")
# Method 1: Find sequences linked via pg_depend (deptype 'a' or
'i')
pg_cursor.execute("""
SELECT a.attname as column_name,
s.relname as seq_name,
n.nspname as seq_schema
FROM pg_class t
JOIN pg_namespace tn ON t.relnamespace = tn.oid
JOIN pg_depend d ON d.refobjid = t.oid
JOIN pg_class s ON s.oid = d.objid
JOIN pg_namespace n ON s.relnamespace = n.oid
LEFT JOIN pg_attribute a ON a.attrelid = t.oid AND
a.attnum = d.refobjsubid
WHERE t.relname = %s
AND tn.nspname = 'public'
AND s.relkind = 'S'
AND d.deptype IN ('a', 'i')
""", (table_name,))
columns_info = pg_cursor.fetchall()
for column_name, seq_name, seq_schema in columns_info:
if seq_name and column_name:
print(f" resetting sequence {seq_name} for
{table_name}.{column_name} (via pg_depend)")
pg_cursor.execute(f"""
SELECT
setval('{seq_schema}.{seq_name}', COALESCE((SELECT MAX("{column_name}") FROM
{table_name}), 0) + 1, false)
""")
all_sequences_reset.add(seq_name)
# Method 2: Fallback - check for sequences by naming convention
{table}_id_seq
# This handles cases where sequences exist but aren't linked
via pg_depend
conventional_seq_name = f"{table_name}_id_seq"
if conventional_seq_name in all_sequences and
conventional_seq_name not in all_sequences_reset:
# Check if table has an 'id' column
pg_cursor.execute("""
SELECT column_name FROM
information_schema.columns
WHERE table_name = %s AND table_schema =
'public' AND column_name = 'id'
""", (table_name,))
if pg_cursor.fetchone():
print(f" resetting sequence
{conventional_seq_name} for {table_name}.id (via naming convention)")
pg_cursor.execute(f"""
SELECT
setval('public.{conventional_seq_name}', COALESCE((SELECT MAX(id) FROM
{table_name}), 0) + 1, false)
""")
all_sequences_reset.add(conventional_seq_name)
pg_conn.commit()
# Summary of sequences
sequences_not_reset = all_sequences - all_sequences_reset
print("\n" + "=" * 60)
print("SEQUENCE SUMMARY")
print("=" * 60)
print(f"\nTotal sequences in database: {len(all_sequences)}")
print(f"Sequences reset: {len(all_sequences_reset)}")
print(f"Sequences NOT reset: {len(sequences_not_reset)}")
if all_sequences_reset:
print(f"\nSequences that were reset
({len(all_sequences_reset)}):")
for seq in sorted(all_sequences_reset):
print(f" + {seq}")
if sequences_not_reset:
print(f"\nSequences that were NOT reset
({len(sequences_not_reset)}):")
for seq in sorted(sequences_not_reset):
print(f" - {seq}")
print("\nNote: These sequences may belong to tables not in
SQLite or have non-standard naming.")
print("\n" + "=" * 60)
print("Migration complete.")
print("=" * 60)
sqlite_conn.close()
pg_conn.close()
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: python main.py <sqlite_path> <pg_conn_string>")
sys.exit(1)
sqlite_path = sys.argv[1]
pg_conn_string = sys.argv[2]
upload_data_to_postgres(sqlite_path=sqlite_path,
pg_conn_string=pg_conn_string)
```
GitHub link:
https://github.com/apache/superset/discussions/17092#discussioncomment-15593920
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]