ktmud commented on a change in pull request #19421:
URL: https://github.com/apache/superset/pull/19421#discussion_r840092583



##########
File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py
##########
@@ -207,427 +244,557 @@ class NewTable(Base):
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=table_column_association_table, cascade="all, 
delete"
     )
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
-class NewDataset(Base):
+class NewDataset(Base, AuxiliaryColumnsMixin):
 
     __tablename__ = "sl_datasets"
 
     id = sa.Column(sa.Integer, primary_key=True)
     sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True)
     name = sa.Column(sa.Text)
-    expression = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    is_physical = sa.Column(sa.Boolean, default=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(sa.Text, default="{}")
     tables: List[NewTable] = relationship(
         "NewTable", secondary=dataset_table_association_table
     )
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=dataset_column_association_table, cascade="all, 
delete"
     )
-    is_physical = sa.Column(sa.Boolean, default=False)
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
 TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"}
 
 
-def load_or_create_tables(
+def find_tables(
     session: Session,
     database_id: int,
     default_schema: Optional[str],
     tables: Set[Table],
-    conditional_quote: Callable[[str], str],
-) -> List[NewTable]:
+) -> List[int]:
     """
-    Load or create new table model instances.
+    Look for NewTable's of from a specific database
     """
     if not tables:
         return []
 
-    # set the default schema in tables that don't have it
-    if default_schema:
-        tables = list(tables)
-        for i, table in enumerate(tables):
-            if table.schema is None:
-                tables[i] = Table(table.table, default_schema, table.catalog)
-
-    # load existing tables
     predicate = or_(
         *[
             and_(
                 NewTable.database_id == database_id,
-                NewTable.schema == table.schema,
+                NewTable.schema == (table.schema or default_schema),
                 NewTable.name == table.table,
             )
             for table in tables
         ]
     )
-    new_tables = session.query(NewTable).filter(predicate).all()
-
-    # use original database model to get the engine
-    engine = (
-        session.query(OriginalDatabase)
-        .filter_by(id=database_id)
-        .one()
-        .get_sqla_engine(default_schema)
-    )
-    inspector = inspect(engine)
-
-    # add missing tables
-    existing = {(table.schema, table.name) for table in new_tables}
-    for table in tables:
-        if (table.schema, table.table) not in existing:
-            column_metadata = inspector.get_columns(table.table, 
schema=table.schema)
-            columns = [
-                NewColumn(
-                    name=column["name"],
-                    type=str(column["type"]),
-                    expression=conditional_quote(column["name"]),
-                    is_temporal=column["type"].python_type.__name__.upper()
-                    in TEMPORAL_TYPES,
-                    is_aggregation=False,
-                    is_physical=True,
-                    is_spatial=False,
-                    is_partition=False,
-                    is_increase_desired=True,
-                )
-                for column in column_metadata
-            ]
-            new_tables.append(
-                NewTable(
-                    name=table.table,
-                    schema=table.schema,
-                    catalog=None,
-                    database_id=database_id,
-                    columns=columns,
-                )
-            )
-            existing.add((table.schema, table.table))
+    return session.query(NewTable.id).filter(predicate).all()
 
-    return new_tables
 
+# helper SQLA elements for easier querying
+is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "")
 
-def after_insert(target: SqlaTable) -> None:  # pylint: disable=too-many-locals
-    """
-    Copy old datasets to the new models.
-    """
-    session = inspect(target).session
+# filtering out columns and metrics with valid associated SqlTable
+active_table_columns = sa.join(
+    TableColumn,
+    SqlaTable,
+    and_(
+        TableColumn.table_id == SqlaTable.id,
+        TableColumn.is_active,
+    ),
+)
+active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == 
SqlaTable.id)
 
-    # get DB-specific conditional quoter for expressions that point to columns 
or
-    # table names
-    database = (
-        target.database
-        or session.query(Database).filter_by(id=target.database_id).first()
-    )
-    if not database:
-        return
-    url = make_url(database.sqlalchemy_uri)
-    dialect_class = url.get_dialect()
-    conditional_quote = dialect_class().identifier_preparer.quote
-
-    # create columns
-    columns = []
-    for column in target.columns:
-        # ``is_active`` might be ``None`` at this point, but it defaults to 
``True``.
-        if column.is_active is False:
-            continue
-
-        try:
-            extra_json = json.loads(column.extra or "{}")
-        except json.decoder.JSONDecodeError:
-            extra_json = {}
-        for attr in {"groupby", "filterable", "verbose_name", 
"python_date_format"}:
-            value = getattr(column, attr)
-            if value:
-                extra_json[attr] = value
-
-        columns.append(
-            NewColumn(
-                name=column.column_name,
-                type=column.type or "Unknown",
-                expression=column.expression or 
conditional_quote(column.column_name),
-                description=column.description,
-                is_temporal=column.is_dttm,
-                is_aggregation=False,
-                is_physical=column.expression is None or column.expression == 
"",
-                is_spatial=False,
-                is_partition=False,
-                is_increase_desired=True,
-                extra_json=json.dumps(extra_json) if extra_json else None,
-                is_managed_externally=target.is_managed_externally,
-                external_url=target.external_url,
-            ),
-        )
 
-    # create metrics
-    for metric in target.metrics:
-        try:
-            extra_json = json.loads(metric.extra or "{}")
-        except json.decoder.JSONDecodeError:
-            extra_json = {}
-        for attr in {"verbose_name", "metric_type", "d3format"}:
-            value = getattr(metric, attr)
-            if value:
-                extra_json[attr] = value
-
-        is_additive = (
-            metric.metric_type and metric.metric_type.lower() in 
ADDITIVE_METRIC_TYPES
+def copy_tables(session: Session) -> None:
+    """Copy Physical tables"""
+    count = session.query(SqlaTable).filter(is_physical_table).count()
+    print(f">> Copy {count:,} physical tables to `sl_tables`...")
+    insert_from_select(
+        "sl_tables",
+        select(
+            [
+                SqlaTable.id,
+                SqlaTable.uuid,
+                SqlaTable.created_on,
+                SqlaTable.changed_on,
+                SqlaTable.created_by_fk,
+                SqlaTable.changed_by_fk,
+                SqlaTable.table_name.label("name"),
+                SqlaTable.schema,
+                SqlaTable.database_id,
+                SqlaTable.is_managed_externally,
+                SqlaTable.external_url,
+            ]
         )
+        # use an inner join to filter out only tables with valid database ids
+        .select_from(
+            sa.join(SqlaTable, Database, SqlaTable.database_id == Database.id)
+        ).where(is_physical_table),
+    )
 
-        columns.append(
-            NewColumn(
-                name=metric.metric_name,
-                type="Unknown",  # figuring this out would require a type 
inferrer
-                expression=metric.expression,
-                warning_text=metric.warning_text,
-                description=metric.description,
-                is_aggregation=True,
-                is_additive=is_additive,
-                is_physical=False,
-                is_spatial=False,
-                is_partition=False,
-                is_increase_desired=True,
-                extra_json=json.dumps(extra_json) if extra_json else None,
-                is_managed_externally=target.is_managed_externally,
-                external_url=target.external_url,
-            ),
-        )
 
-    # physical dataset
-    if not target.sql:
-        physical_columns = [column for column in columns if column.is_physical]
-
-        # create table
-        table = NewTable(
-            name=target.table_name,
-            schema=target.schema,
-            catalog=None,  # currently not supported
-            database_id=target.database_id,
-            columns=physical_columns,
-            is_managed_externally=target.is_managed_externally,
-            external_url=target.external_url,
-        )
-        tables = [table]
-
-    # virtual dataset
-    else:
-        # mark all columns as virtual (not physical)
-        for column in columns:
-            column.is_physical = False
-
-        # find referenced tables
-        referenced_tables = extract_table_references(target.sql, 
dialect_class.name)
-        tables = load_or_create_tables(
-            session,
-            target.database_id,
-            target.schema,
-            referenced_tables,
-            conditional_quote,
-        )
+def copy_datasets(session: Session) -> None:
+    """Copy all datasets"""
+    count = session.query(SqlaTable).count()
+    if not count:
+        return
+    print(f">> Copy {count:,} SqlaTable to `sl_datasets`...")
+    insert_from_select(
+        "sl_datasets",
+        select(
+            [
+                # keep the ids the same for easier migration of relationships
+                SqlaTable.id,
+                SqlaTable.uuid,
+                SqlaTable.created_on,
+                SqlaTable.changed_on,
+                SqlaTable.created_by_fk,
+                SqlaTable.changed_by_fk,
+                SqlaTable.id.label("sqlatable_id"),
+                SqlaTable.table_name.label("name"),
+                func.coalesce(SqlaTable.sql, 
SqlaTable.table_name).label("expression"),
+                is_physical_table.label("is_physical"),
+                SqlaTable.is_managed_externally,
+                SqlaTable.external_url,
+                SqlaTable.extra.label("extra_json"),
+            ]
+        ),
+    )
 
-    # create the new dataset
-    dataset = NewDataset(
-        sqlatable_id=target.id,
-        name=target.table_name,
-        expression=target.sql or conditional_quote(target.table_name),
-        tables=tables,
-        columns=columns,
-        is_physical=not target.sql,
-        is_managed_externally=target.is_managed_externally,
-        external_url=target.external_url,
+    print("   Link physical datasets with tables...")
+    # Physical datasets (tables) have the same dataset.id and table.id
+    # as both are from SqlaTable.id
+    insert_from_select(
+        "sl_dataset_tables",
+        select(
+            [
+                NewTable.id.label("dataset_id"),
+                NewTable.id.label("table_id"),
+            ]
+        ),
     )
-    session.add(dataset)
 
 
-def upgrade():
-    # Create tables for the new models.
-    op.create_table(
+def copy_columns(session: Session) -> None:
+    """Copy columns with active associated SqlTable"""
+    count = 
session.query(TableColumn).select_from(active_table_columns).count()
+    if not count:
+        return
+    print(f">> Copy {count:,} active table columns to `sl_columns`...")
+    insert_from_select(
         "sl_columns",
-        # AuditMixinNullable
-        sa.Column("created_on", sa.DateTime(), nullable=True),
-        sa.Column("changed_on", sa.DateTime(), nullable=True),
-        sa.Column("created_by_fk", sa.Integer(), nullable=True),
-        sa.Column("changed_by_fk", sa.Integer(), nullable=True),
-        # ExtraJSONMixin
-        sa.Column("extra_json", sa.Text(), nullable=True),
-        # ImportExportMixin
-        sa.Column("uuid", UUIDType(binary=True), primary_key=False, 
default=uuid4),
-        # Column
-        sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
-        sa.Column("name", sa.TEXT(), nullable=False),
-        sa.Column("type", sa.TEXT(), nullable=False),
-        sa.Column("expression", sa.TEXT(), nullable=False),
-        sa.Column(
-            "is_physical",
-            sa.BOOLEAN(),
-            nullable=False,
-            default=True,
-        ),
-        sa.Column("description", sa.TEXT(), nullable=True),
-        sa.Column("warning_text", sa.TEXT(), nullable=True),
-        sa.Column("unit", sa.TEXT(), nullable=True),
-        sa.Column("is_temporal", sa.BOOLEAN(), nullable=False),
-        sa.Column(
-            "is_spatial",
-            sa.BOOLEAN(),
-            nullable=False,
-            default=False,
-        ),
-        sa.Column(
-            "is_partition",
-            sa.BOOLEAN(),
-            nullable=False,
-            default=False,
-        ),
-        sa.Column(
-            "is_aggregation",
-            sa.BOOLEAN(),
-            nullable=False,
-            default=False,
-        ),
-        sa.Column(
-            "is_additive",
-            sa.BOOLEAN(),
-            nullable=False,
-            default=False,
-        ),
-        sa.Column(
-            "is_increase_desired",
-            sa.BOOLEAN(),
-            nullable=False,
-            default=True,
-        ),
-        sa.Column(
-            "is_managed_externally",
-            sa.Boolean(),
-            nullable=False,
-            server_default=sa.false(),
-        ),
-        sa.Column("external_url", sa.Text(), nullable=True),
-        sa.PrimaryKeyConstraint("id"),
+        select(
+            [
+                # keep the same column.id so later relationships can be added 
easier
+                TableColumn.id,
+                TableColumn.uuid,
+                TableColumn.created_on,
+                TableColumn.changed_on,
+                TableColumn.created_by_fk,
+                TableColumn.changed_by_fk,
+                TableColumn.column_name.label("name"),
+                TableColumn.description,
+                func.coalesce(TableColumn.expression, 
TableColumn.column_name).label(
+                    "expression"
+                ),
+                sa.literal(False).label("is_aggregation"),
+                or_(
+                    TableColumn.expression.is_(None), (TableColumn.expression 
== "")
+                ).label("is_physical"),
+                TableColumn.is_dttm.label("is_temporal"),
+                func.coalesce(TableColumn.type, "Unknown").label("type"),
+                TableColumn.extra.label("extra_json"),
+            ]
+        ).select_from(active_table_columns),
     )
-    with op.batch_alter_table("sl_columns") as batch_op:
-        batch_op.create_unique_constraint("uq_sl_columns_uuid", ["uuid"])
 
-    op.create_table(
-        "sl_tables",
-        # AuditMixinNullable
-        sa.Column("created_on", sa.DateTime(), nullable=True),
-        sa.Column("changed_on", sa.DateTime(), nullable=True),
-        sa.Column("created_by_fk", sa.Integer(), nullable=True),
-        sa.Column("changed_by_fk", sa.Integer(), nullable=True),
-        # ExtraJSONMixin
-        sa.Column("extra_json", sa.Text(), nullable=True),
-        # ImportExportMixin
-        sa.Column("uuid", UUIDType(binary=True), primary_key=False, 
default=uuid4),
-        # Table
-        sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
-        sa.Column("database_id", sa.INTEGER(), autoincrement=False, 
nullable=False),
-        sa.Column("catalog", sa.TEXT(), nullable=True),
-        sa.Column("schema", sa.TEXT(), nullable=True),
-        sa.Column("name", sa.TEXT(), nullable=False),
-        sa.Column(
-            "is_managed_externally",
-            sa.Boolean(),
-            nullable=False,
-            server_default=sa.false(),
-        ),
-        sa.Column("external_url", sa.Text(), nullable=True),
-        sa.ForeignKeyConstraint(["database_id"], ["dbs.id"], 
name="sl_tables_ibfk_1"),
-        sa.PrimaryKeyConstraint("id"),
+    print("   Link physical table columns to `sl_tables`...")
+    insert_from_select(
+        "sl_table_columns",
+        select(
+            [
+                TableColumn.table_id,
+                TableColumn.id.label("column_id"),
+            ]
+        )
+        .select_from(active_table_columns)
+        .where(is_physical_table),
     )
-    with op.batch_alter_table("sl_tables") as batch_op:
-        batch_op.create_unique_constraint("uq_sl_tables_uuid", ["uuid"])
 
-    op.create_table(
-        "sl_table_columns",
-        sa.Column("table_id", sa.INTEGER(), autoincrement=False, 
nullable=False),
-        sa.Column("column_id", sa.INTEGER(), autoincrement=False, 
nullable=False),
-        sa.ForeignKeyConstraint(
-            ["column_id"], ["sl_columns.id"], name="sl_table_columns_ibfk_2"
-        ),
-        sa.ForeignKeyConstraint(
-            ["table_id"], ["sl_tables.id"], name="sl_table_columns_ibfk_1"
-        ),
+    print("   Link all columns to `sl_datasets`...")
+    insert_from_select(
+        "sl_dataset_columns",
+        select(
+            [
+                TableColumn.table_id.label("dataset_id"),
+                TableColumn.id.label("column_id"),
+            ],
+        ).select_from(active_table_columns),
     )
 
-    op.create_table(
-        "sl_datasets",
-        # AuditMixinNullable
-        sa.Column("created_on", sa.DateTime(), nullable=True),
-        sa.Column("changed_on", sa.DateTime(), nullable=True),
-        sa.Column("created_by_fk", sa.Integer(), nullable=True),
-        sa.Column("changed_by_fk", sa.Integer(), nullable=True),
-        # ExtraJSONMixin
-        sa.Column("extra_json", sa.Text(), nullable=True),
-        # ImportExportMixin
-        sa.Column("uuid", UUIDType(binary=True), primary_key=False, 
default=uuid4),
-        # Dataset
-        sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
-        sa.Column("sqlatable_id", sa.INTEGER(), nullable=True),
-        sa.Column("name", sa.TEXT(), nullable=False),
-        sa.Column("expression", sa.TEXT(), nullable=False),
-        sa.Column(
-            "is_physical",
-            sa.BOOLEAN(),
-            nullable=False,
-            default=False,
-        ),
-        sa.Column(
-            "is_managed_externally",
-            sa.Boolean(),
-            nullable=False,
-            server_default=sa.false(),
-        ),
-        sa.Column("external_url", sa.Text(), nullable=True),
-        sa.PrimaryKeyConstraint("id"),
+
+def copy_metrics(session: Session) -> None:
+    """Copy metrics as virtual columns"""
+    metrics_count = 
session.query(SqlMetric).select_from(active_metrics).count()
+    if not metrics_count:
+        return
+    # offset metric column ids by the last id of table columns
+    id_offset = session.query(func.max(NewColumn.id)).scalar()
+
+    print(f">> Copy {metrics_count:,} metrics to `sl_columns`...")
+    insert_from_select(
+        "sl_columns",
+        select(
+            [
+                (SqlMetric.id + id_offset).label("id"),
+                SqlMetric.uuid,
+                SqlMetric.created_on,
+                SqlMetric.changed_on,
+                SqlMetric.created_by_fk,
+                SqlMetric.changed_by_fk,
+                SqlMetric.metric_name.label("name"),
+                SqlMetric.expression,
+                SqlMetric.description,
+                sa.literal("Unknown").label("type"),
+                (
+                    sa.func.lower(SqlMetric.metric_type)
+                    .in_(ADDITIVE_METRIC_TYPES_LOWER)
+                    .label("is_additive")
+                ),
+                sa.literal(False).label("is_physical"),
+                sa.literal(False).label("is_temporal"),
+                sa.literal(True).label("is_aggregation"),
+                SqlMetric.extra.label("extra_json"),
+                SqlMetric.warning_text,
+            ]
+        ).select_from(active_metrics),
     )
-    with op.batch_alter_table("sl_datasets") as batch_op:
-        batch_op.create_unique_constraint("uq_sl_datasets_uuid", ["uuid"])
-        batch_op.create_unique_constraint(
-            "uq_sl_datasets_sqlatable_id", ["sqlatable_id"]
-        )
 
-    op.create_table(
+    print("   Link metric columns to datasets...")
+    insert_from_select(
         "sl_dataset_columns",
-        sa.Column("dataset_id", sa.INTEGER(), autoincrement=False, 
nullable=False),
-        sa.Column("column_id", sa.INTEGER(), autoincrement=False, 
nullable=False),
-        sa.ForeignKeyConstraint(
-            ["column_id"], ["sl_columns.id"], name="sl_dataset_columns_ibfk_2"
-        ),
-        sa.ForeignKeyConstraint(
-            ["dataset_id"], ["sl_datasets.id"], 
name="sl_dataset_columns_ibfk_1"
-        ),
+        select(
+            [
+                SqlMetric.table_id.label("dataset_id"),
+                (SqlMetric.id + id_offset).label("column_id"),
+            ],
+        ).select_from(active_metrics),
     )
 
-    op.create_table(
-        "sl_dataset_tables",
-        sa.Column("dataset_id", sa.INTEGER(), autoincrement=False, 
nullable=False),
-        sa.Column("table_id", sa.INTEGER(), autoincrement=False, 
nullable=False),
-        sa.ForeignKeyConstraint(
-            ["dataset_id"], ["sl_datasets.id"], name="sl_dataset_tables_ibfk_1"
-        ),
-        sa.ForeignKeyConstraint(
-            ["table_id"], ["sl_tables.id"], name="sl_dataset_tables_ibfk_2"
-        ),
+
+def postprocess_datasets(session: Session) -> None:
+    """
+    Postprocess datasets after insertion to
+      - Quote table names for physical datasets (if needed)
+      - Link referenced tables to virtual datasets
+    """
+    total = session.query(SqlaTable).count()
+    if not total:
+        return
+
+    offset = 0
+    limit = 10000
+
+    joined_tables = sa.join(
+        NewDataset,
+        SqlaTable,
+        NewDataset.sqlatable_id == SqlaTable.id,
+    ).join(
+        Database,
+        Database.id == SqlaTable.database_id,
+        isouter=True,
     )
+    assert session.query(func.count()).select_from(joined_tables).scalar() == 
total
 
-    # migrate existing datasets to the new models
-    bind = op.get_bind()
-    session = db.Session(bind=bind)  # pylint: disable=no-member
+    print(f">> Run postprocessing on {total} datasets")
+
+    update_count = 0
+
+    def print_update_count():
+        if SHOW_PROGRESS:
+            print(
+                f"   Will update {update_count} datasets" + " " * 20,
+                end="\r",
+            )
+
+    while offset < total:
+        if SHOW_PROGRESS:
+            print(
+                f"   Postprocess dataset {offset + 1}~{min(total, offset + 
limit)}..."
+                + " " * 30
+            )
+        for (
+            dataset_id,
+            is_physical,
+            expression,
+            database_id,
+            schema,
+            sqlalchemy_uri,
+        ) in session.execute(
+            select(
+                [
+                    NewDataset.id,
+                    NewDataset.is_physical,
+                    NewDataset.expression,
+                    SqlaTable.database_id,
+                    SqlaTable.schema,
+                    Database.sqlalchemy_uri,
+                ]
+            )
+            .select_from(joined_tables)
+            .offset(offset)
+            .limit(limit)
+        ):
+            drivername = (sqlalchemy_uri or "").split("://")[0]
+            if is_physical and drivername:
+                quoted_expression = 
get_identifier_quoter(drivername)(expression)
+                if quoted_expression != expression:
+                    session.execute(
+                        sa.update(NewDataset)
+                        .where(NewDataset.id == dataset_id)
+                        .values(expression=quoted_expression)
+                    )
+                    update_count += 1
+                    print_update_count()
+            elif not is_physical and expression:
+                table_refrences = extract_table_references(
+                    expression, get_dialect_name(drivername), 
show_warning=False
+                )

Review comment:
       This table reference extraction should probably be in a separate 
process/script as well. Removing this will cut at least another 30 min of 
migration time for us.
   
   @betodealmeida will this info be used any time soon? Or should we design for 
it later when we actually want to implement features around it? There will be 
some manual syncing again anyway once users update their SQL queries. This 
extraction also doesn't capture SQL queries with Jinja well.

##########
File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py
##########
@@ -207,427 +244,557 @@ class NewTable(Base):
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=table_column_association_table, cascade="all, 
delete"
     )
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
-class NewDataset(Base):
+class NewDataset(Base, AuxiliaryColumnsMixin):
 
     __tablename__ = "sl_datasets"
 
     id = sa.Column(sa.Integer, primary_key=True)
     sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True)
     name = sa.Column(sa.Text)
-    expression = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    is_physical = sa.Column(sa.Boolean, default=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(sa.Text, default="{}")
     tables: List[NewTable] = relationship(
         "NewTable", secondary=dataset_table_association_table
     )
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=dataset_column_association_table, cascade="all, 
delete"
     )
-    is_physical = sa.Column(sa.Boolean, default=False)
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
 TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"}
 
 
-def load_or_create_tables(
+def find_tables(
     session: Session,
     database_id: int,
     default_schema: Optional[str],
     tables: Set[Table],
-    conditional_quote: Callable[[str], str],
-) -> List[NewTable]:
+) -> List[int]:
     """
-    Load or create new table model instances.
+    Look for NewTable's of from a specific database
     """
     if not tables:
         return []
 
-    # set the default schema in tables that don't have it
-    if default_schema:
-        tables = list(tables)
-        for i, table in enumerate(tables):
-            if table.schema is None:
-                tables[i] = Table(table.table, default_schema, table.catalog)
-
-    # load existing tables
     predicate = or_(
         *[
             and_(
                 NewTable.database_id == database_id,
-                NewTable.schema == table.schema,
+                NewTable.schema == (table.schema or default_schema),
                 NewTable.name == table.table,
             )
             for table in tables
         ]
     )
-    new_tables = session.query(NewTable).filter(predicate).all()
-
-    # use original database model to get the engine
-    engine = (
-        session.query(OriginalDatabase)
-        .filter_by(id=database_id)
-        .one()
-        .get_sqla_engine(default_schema)
-    )
-    inspector = inspect(engine)
-
-    # add missing tables
-    existing = {(table.schema, table.name) for table in new_tables}
-    for table in tables:
-        if (table.schema, table.table) not in existing:
-            column_metadata = inspector.get_columns(table.table, 
schema=table.schema)
-            columns = [
-                NewColumn(
-                    name=column["name"],
-                    type=str(column["type"]),
-                    expression=conditional_quote(column["name"]),
-                    is_temporal=column["type"].python_type.__name__.upper()
-                    in TEMPORAL_TYPES,
-                    is_aggregation=False,
-                    is_physical=True,
-                    is_spatial=False,
-                    is_partition=False,
-                    is_increase_desired=True,
-                )
-                for column in column_metadata
-            ]
-            new_tables.append(
-                NewTable(
-                    name=table.table,
-                    schema=table.schema,
-                    catalog=None,
-                    database_id=database_id,
-                    columns=columns,
-                )
-            )
-            existing.add((table.schema, table.table))
+    return session.query(NewTable.id).filter(predicate).all()
 
-    return new_tables
 
+# helper SQLA elements for easier querying
+is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "")
 
-def after_insert(target: SqlaTable) -> None:  # pylint: disable=too-many-locals
-    """
-    Copy old datasets to the new models.
-    """
-    session = inspect(target).session
+# filtering out columns and metrics with valid associated SqlTable
+active_table_columns = sa.join(
+    TableColumn,
+    SqlaTable,
+    and_(
+        TableColumn.table_id == SqlaTable.id,
+        TableColumn.is_active,
+    ),
+)
+active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == 
SqlaTable.id)
 
-    # get DB-specific conditional quoter for expressions that point to columns 
or
-    # table names
-    database = (
-        target.database
-        or session.query(Database).filter_by(id=target.database_id).first()
-    )
-    if not database:
-        return
-    url = make_url(database.sqlalchemy_uri)
-    dialect_class = url.get_dialect()
-    conditional_quote = dialect_class().identifier_preparer.quote
-
-    # create columns
-    columns = []
-    for column in target.columns:
-        # ``is_active`` might be ``None`` at this point, but it defaults to 
``True``.
-        if column.is_active is False:
-            continue
-
-        try:
-            extra_json = json.loads(column.extra or "{}")
-        except json.decoder.JSONDecodeError:
-            extra_json = {}
-        for attr in {"groupby", "filterable", "verbose_name", 
"python_date_format"}:
-            value = getattr(column, attr)
-            if value:
-                extra_json[attr] = value
-
-        columns.append(
-            NewColumn(
-                name=column.column_name,
-                type=column.type or "Unknown",
-                expression=column.expression or 
conditional_quote(column.column_name),
-                description=column.description,
-                is_temporal=column.is_dttm,
-                is_aggregation=False,
-                is_physical=column.expression is None or column.expression == 
"",
-                is_spatial=False,
-                is_partition=False,
-                is_increase_desired=True,
-                extra_json=json.dumps(extra_json) if extra_json else None,
-                is_managed_externally=target.is_managed_externally,
-                external_url=target.external_url,
-            ),
-        )
 
-    # create metrics
-    for metric in target.metrics:
-        try:
-            extra_json = json.loads(metric.extra or "{}")
-        except json.decoder.JSONDecodeError:
-            extra_json = {}
-        for attr in {"verbose_name", "metric_type", "d3format"}:
-            value = getattr(metric, attr)
-            if value:
-                extra_json[attr] = value
-
-        is_additive = (
-            metric.metric_type and metric.metric_type.lower() in 
ADDITIVE_METRIC_TYPES
+def copy_tables(session: Session) -> None:
+    """Copy Physical tables"""
+    count = session.query(SqlaTable).filter(is_physical_table).count()
+    print(f">> Copy {count:,} physical tables to `sl_tables`...")
+    insert_from_select(
+        "sl_tables",
+        select(
+            [
+                SqlaTable.id,

Review comment:
       I'm porting over the same `id`, `uuid`, `create_on`, `changed_on` from 
the original tables so relationship mapping can be easier. As the new tables 
are intended to fully replace the original tables, retaining these info would 
also be useful for end user experience (especially `changed_on` and 
`created_on`).

##########
File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py
##########
@@ -207,427 +244,557 @@ class NewTable(Base):
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=table_column_association_table, cascade="all, 
delete"
     )
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
-class NewDataset(Base):
+class NewDataset(Base, AuxiliaryColumnsMixin):
 
     __tablename__ = "sl_datasets"
 
     id = sa.Column(sa.Integer, primary_key=True)
     sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True)
     name = sa.Column(sa.Text)
-    expression = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    is_physical = sa.Column(sa.Boolean, default=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(sa.Text, default="{}")
     tables: List[NewTable] = relationship(
         "NewTable", secondary=dataset_table_association_table
     )
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=dataset_column_association_table, cascade="all, 
delete"
     )
-    is_physical = sa.Column(sa.Boolean, default=False)
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
 TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"}
 
 
-def load_or_create_tables(
+def find_tables(
     session: Session,
     database_id: int,
     default_schema: Optional[str],
     tables: Set[Table],
-    conditional_quote: Callable[[str], str],
-) -> List[NewTable]:
+) -> List[int]:
     """
-    Load or create new table model instances.
+    Look for NewTable's of from a specific database
     """
     if not tables:
         return []
 
-    # set the default schema in tables that don't have it
-    if default_schema:
-        tables = list(tables)
-        for i, table in enumerate(tables):
-            if table.schema is None:
-                tables[i] = Table(table.table, default_schema, table.catalog)
-
-    # load existing tables
     predicate = or_(
         *[
             and_(
                 NewTable.database_id == database_id,
-                NewTable.schema == table.schema,
+                NewTable.schema == (table.schema or default_schema),
                 NewTable.name == table.table,
             )
             for table in tables
         ]
     )
-    new_tables = session.query(NewTable).filter(predicate).all()
-
-    # use original database model to get the engine
-    engine = (
-        session.query(OriginalDatabase)
-        .filter_by(id=database_id)
-        .one()
-        .get_sqla_engine(default_schema)
-    )
-    inspector = inspect(engine)
-
-    # add missing tables
-    existing = {(table.schema, table.name) for table in new_tables}
-    for table in tables:
-        if (table.schema, table.table) not in existing:
-            column_metadata = inspector.get_columns(table.table, 
schema=table.schema)
-            columns = [
-                NewColumn(
-                    name=column["name"],
-                    type=str(column["type"]),
-                    expression=conditional_quote(column["name"]),
-                    is_temporal=column["type"].python_type.__name__.upper()
-                    in TEMPORAL_TYPES,
-                    is_aggregation=False,
-                    is_physical=True,
-                    is_spatial=False,
-                    is_partition=False,
-                    is_increase_desired=True,
-                )
-                for column in column_metadata
-            ]
-            new_tables.append(
-                NewTable(
-                    name=table.table,
-                    schema=table.schema,
-                    catalog=None,
-                    database_id=database_id,
-                    columns=columns,
-                )
-            )
-            existing.add((table.schema, table.table))
+    return session.query(NewTable.id).filter(predicate).all()
 
-    return new_tables
 
+# helper SQLA elements for easier querying
+is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "")
 
-def after_insert(target: SqlaTable) -> None:  # pylint: disable=too-many-locals
-    """
-    Copy old datasets to the new models.
-    """
-    session = inspect(target).session
+# filtering out columns and metrics with valid associated SqlTable
+active_table_columns = sa.join(
+    TableColumn,
+    SqlaTable,
+    and_(
+        TableColumn.table_id == SqlaTable.id,
+        TableColumn.is_active,
+    ),
+)
+active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == 
SqlaTable.id)
 
-    # get DB-specific conditional quoter for expressions that point to columns 
or
-    # table names
-    database = (
-        target.database
-        or session.query(Database).filter_by(id=target.database_id).first()
-    )
-    if not database:
-        return
-    url = make_url(database.sqlalchemy_uri)
-    dialect_class = url.get_dialect()
-    conditional_quote = dialect_class().identifier_preparer.quote
-
-    # create columns
-    columns = []
-    for column in target.columns:
-        # ``is_active`` might be ``None`` at this point, but it defaults to 
``True``.
-        if column.is_active is False:
-            continue
-
-        try:
-            extra_json = json.loads(column.extra or "{}")
-        except json.decoder.JSONDecodeError:
-            extra_json = {}
-        for attr in {"groupby", "filterable", "verbose_name", 
"python_date_format"}:
-            value = getattr(column, attr)
-            if value:
-                extra_json[attr] = value
-
-        columns.append(
-            NewColumn(
-                name=column.column_name,
-                type=column.type or "Unknown",
-                expression=column.expression or 
conditional_quote(column.column_name),
-                description=column.description,
-                is_temporal=column.is_dttm,
-                is_aggregation=False,
-                is_physical=column.expression is None or column.expression == 
"",
-                is_spatial=False,
-                is_partition=False,
-                is_increase_desired=True,
-                extra_json=json.dumps(extra_json) if extra_json else None,
-                is_managed_externally=target.is_managed_externally,
-                external_url=target.external_url,
-            ),
-        )
 
-    # create metrics
-    for metric in target.metrics:
-        try:
-            extra_json = json.loads(metric.extra or "{}")
-        except json.decoder.JSONDecodeError:
-            extra_json = {}
-        for attr in {"verbose_name", "metric_type", "d3format"}:
-            value = getattr(metric, attr)
-            if value:
-                extra_json[attr] = value
-
-        is_additive = (
-            metric.metric_type and metric.metric_type.lower() in 
ADDITIVE_METRIC_TYPES
+def copy_tables(session: Session) -> None:
+    """Copy Physical tables"""
+    count = session.query(SqlaTable).filter(is_physical_table).count()
+    print(f">> Copy {count:,} physical tables to `sl_tables`...")
+    insert_from_select(
+        "sl_tables",
+        select(
+            [
+                SqlaTable.id,
+                SqlaTable.uuid,
+                SqlaTable.created_on,
+                SqlaTable.changed_on,
+                SqlaTable.created_by_fk,
+                SqlaTable.changed_by_fk,

Review comment:
       Previous migration does not copy values of these columns to the new 
tables. I think it'd be useful to retain them, especially the properties from 
AuditMixin.

##########
File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py
##########
@@ -207,427 +244,557 @@ class NewTable(Base):
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=table_column_association_table, cascade="all, 
delete"
     )
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
-class NewDataset(Base):
+class NewDataset(Base, AuxiliaryColumnsMixin):
 
     __tablename__ = "sl_datasets"
 
     id = sa.Column(sa.Integer, primary_key=True)
     sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True)
     name = sa.Column(sa.Text)
-    expression = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    is_physical = sa.Column(sa.Boolean, default=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(sa.Text, default="{}")
     tables: List[NewTable] = relationship(
         "NewTable", secondary=dataset_table_association_table
     )
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=dataset_column_association_table, cascade="all, 
delete"
     )
-    is_physical = sa.Column(sa.Boolean, default=False)
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
 TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"}
 
 
-def load_or_create_tables(
+def find_tables(
     session: Session,
     database_id: int,
     default_schema: Optional[str],
     tables: Set[Table],
-    conditional_quote: Callable[[str], str],
-) -> List[NewTable]:
+) -> List[int]:
     """
-    Load or create new table model instances.
+    Look for NewTable's of from a specific database
     """
     if not tables:
         return []
 
-    # set the default schema in tables that don't have it
-    if default_schema:
-        tables = list(tables)
-        for i, table in enumerate(tables):
-            if table.schema is None:
-                tables[i] = Table(table.table, default_schema, table.catalog)
-
-    # load existing tables
     predicate = or_(
         *[
             and_(
                 NewTable.database_id == database_id,
-                NewTable.schema == table.schema,
+                NewTable.schema == (table.schema or default_schema),
                 NewTable.name == table.table,
             )
             for table in tables
         ]
     )
-    new_tables = session.query(NewTable).filter(predicate).all()
-
-    # use original database model to get the engine
-    engine = (
-        session.query(OriginalDatabase)
-        .filter_by(id=database_id)
-        .one()
-        .get_sqla_engine(default_schema)
-    )
-    inspector = inspect(engine)
-
-    # add missing tables

Review comment:
       This logic of syncing table schema from datasources is removed. It 
should lie in another offline script.

##########
File path: 
superset/migrations/versions/07071313dd52_change_fetch_values_predicate_to_text.py
##########
@@ -30,9 +30,7 @@
 
 import sqlalchemy as sa
 from alembic import op
-from sqlalchemy import and_, func, or_
-from sqlalchemy.dialects import postgresql
-from sqlalchemy.sql.schema import Table

Review comment:
       Bycatch: clean up unused imports.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@superset.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@superset.apache.org
For additional commands, e-mail: notifications-h...@superset.apache.org

Reply via email to