ktmud commented on a change in pull request #19421: URL: https://github.com/apache/superset/pull/19421#discussion_r839902326
########## File path: superset/connectors/base/models.py ########## @@ -586,7 +586,7 @@ class BaseColumn(AuditMixinNullable, ImportExportMixin): type = Column(Text) groupby = Column(Boolean, default=True) filterable = Column(Boolean, default=True) - description = Column(Text) + description = Column(MediumText()) Review comment: `MediumText` is current type for these fields. They were updated in db migrations at some point. Updating for consistency. ########## File path: superset/connectors/sqla/models.py ########## @@ -130,6 +131,7 @@ "sum", "doubleSum", } +ADDITIVE_METRIC_TYPES_LOWER = {op.lower() for op in ADDITIVE_METRIC_TYPES} Review comment: `metric_type.lower()` is compared with `doubleSum`, which will always be `false`. Not sure if the original casing will be used elsewhere, so I added a new variable. ########## File path: superset/migrations/shared/utils.py ########## @@ -84,18 +97,29 @@ def find_nodes_by_key(element: Any, target: str) -> Iterator[Any]: yield from find_nodes_by_key(value, target) -def extract_table_references(sql_text: str, sqla_dialect: str) -> Set[Table]: +RE_JINJA_VAR = re.compile(r"\{\{[^\{\}]+\}\}") +RE_JINJA_BLOCK = re.compile(r"\{[%#][^\{\}%#]+[%#]\}") + + +def extract_table_references( + sql_text: str, sqla_dialect: str, show_warning=True +) -> Set[Table]: """ Return all the dependencies from a SQL sql_text. """ dialect = "generic" for dialect, sqla_dialects in sqloxide_dialects.items(): if sqla_dialect in sqla_dialects: break + sql_text = RE_JINJA_BLOCK.sub(" ", sql_text) + sql_text = RE_JINJA_VAR.sub("abc", sql_text) Review comment: Interpolate Jinja vars to give `sqloxite` a higher chance of successfully parsing the SQL text. ########## File path: superset/connectors/sqla/models.py ########## @@ -522,7 +524,7 @@ class SqlaTable(Model, BaseDatasource): # pylint: disable=too-many-public-metho foreign_keys=[database_id], ) schema = Column(String(255)) - sql = Column(Text) + sql = Column(MediumText()) Review comment: I found out some columns need to be `MediumText` only after I noticed sql parse was failing because some the SQL statements were cut off when copying to the new table. ########## File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py ########## @@ -207,427 +241,481 @@ class NewTable(Base): columns: List[NewColumn] = relationship( "NewColumn", secondary=table_column_association_table, cascade="all, delete" ) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) -class NewDataset(Base): +class NewDataset(Base, AuxiliaryColumnsMixin): __tablename__ = "sl_datasets" id = sa.Column(sa.Integer, primary_key=True) sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True) name = sa.Column(sa.Text) - expression = sa.Column(sa.Text) + expression = sa.Column(MediumText()) + is_physical = sa.Column(sa.Boolean, default=False) + is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) + external_url = sa.Column(sa.Text, nullable=True) + extra_json = sa.Column(sa.Text, default="{}") tables: List[NewTable] = relationship( "NewTable", secondary=dataset_table_association_table ) columns: List[NewColumn] = relationship( "NewColumn", secondary=dataset_column_association_table, cascade="all, delete" ) - is_physical = sa.Column(sa.Boolean, default=False) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"} -def load_or_create_tables( +def find_tables( session: Session, database_id: int, default_schema: Optional[str], tables: Set[Table], - conditional_quote: Callable[[str], str], -) -> List[NewTable]: +) -> List[int]: """ - Load or create new table model instances. + Look for NewTable's of from a specific database """ if not tables: return [] - # set the default schema in tables that don't have it - if default_schema: - tables = list(tables) - for i, table in enumerate(tables): - if table.schema is None: - tables[i] = Table(table.table, default_schema, table.catalog) - - # load existing tables predicate = or_( *[ and_( NewTable.database_id == database_id, - NewTable.schema == table.schema, + NewTable.schema == (table.schema or default_schema), NewTable.name == table.table, ) for table in tables ] ) - new_tables = session.query(NewTable).filter(predicate).all() - - # use original database model to get the engine - engine = ( - session.query(OriginalDatabase) - .filter_by(id=database_id) - .one() - .get_sqla_engine(default_schema) - ) - inspector = inspect(engine) - - # add missing tables - existing = {(table.schema, table.name) for table in new_tables} - for table in tables: - if (table.schema, table.table) not in existing: - column_metadata = inspector.get_columns(table.table, schema=table.schema) - columns = [ - NewColumn( - name=column["name"], - type=str(column["type"]), - expression=conditional_quote(column["name"]), - is_temporal=column["type"].python_type.__name__.upper() - in TEMPORAL_TYPES, - is_aggregation=False, - is_physical=True, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - ) - for column in column_metadata - ] - new_tables.append( - NewTable( - name=table.table, - schema=table.schema, - catalog=None, - database_id=database_id, - columns=columns, - ) - ) - existing.add((table.schema, table.table)) + return session.query(NewTable.id).filter(predicate).all() - return new_tables +# helper SQLA elements for easier querying +is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "") -def after_insert(target: SqlaTable) -> None: # pylint: disable=too-many-locals - """ - Copy old datasets to the new models. - """ - session = inspect(target).session +# filtering out columns and metrics with valid associated SqlTable +active_table_columns = sa.join( + TableColumn, + SqlaTable, + and_( + TableColumn.table_id == SqlaTable.id, + TableColumn.is_active, + ), +) +active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == SqlaTable.id) - # get DB-specific conditional quoter for expressions that point to columns or - # table names - database = ( - target.database - or session.query(Database).filter_by(id=target.database_id).first() - ) - if not database: - return - url = make_url(database.sqlalchemy_uri) - dialect_class = url.get_dialect() - conditional_quote = dialect_class().identifier_preparer.quote - - # create columns - columns = [] - for column in target.columns: - # ``is_active`` might be ``None`` at this point, but it defaults to ``True``. - if column.is_active is False: - continue - - try: - extra_json = json.loads(column.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}: - value = getattr(column, attr) - if value: - extra_json[attr] = value - - columns.append( - NewColumn( - name=column.column_name, - type=column.type or "Unknown", - expression=column.expression or conditional_quote(column.column_name), - description=column.description, - is_temporal=column.is_dttm, - is_aggregation=False, - is_physical=column.expression is None or column.expression == "", - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - # create metrics - for metric in target.metrics: - try: - extra_json = json.loads(metric.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"verbose_name", "metric_type", "d3format"}: - value = getattr(metric, attr) - if value: - extra_json[attr] = value - - is_additive = ( - metric.metric_type and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES +def copy_tables(session: Session) -> None: + """Copy Physical tables""" + count = session.query(SqlaTable).filter(is_physical_table).count() + print(f">> Copy {count:,} physical tables to `sl_tables`...") + insert_from_select( + "sl_tables", + select( + [ + SqlaTable.id, + SqlaTable.uuid, + SqlaTable.created_on, + SqlaTable.changed_on, + SqlaTable.table_name.label("name"), + SqlaTable.schema, + SqlaTable.database_id, + SqlaTable.is_managed_externally, + SqlaTable.external_url, + ] ) + # use an inner join to filter out only tables with valid database ids + .select_from( + sa.join(SqlaTable, Database, SqlaTable.database_id == Database.id) + ).where(is_physical_table), + ) - columns.append( - NewColumn( - name=metric.metric_name, - type="Unknown", # figuring this out would require a type inferrer - expression=metric.expression, - warning_text=metric.warning_text, - description=metric.description, - is_aggregation=True, - is_additive=is_additive, - is_physical=False, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - # physical dataset - if not target.sql: - physical_columns = [column for column in columns if column.is_physical] - - # create table - table = NewTable( - name=target.table_name, - schema=target.schema, - catalog=None, # currently not supported - database_id=target.database_id, - columns=physical_columns, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - tables = [table] - - # virtual dataset - else: - # mark all columns as virtual (not physical) - for column in columns: - column.is_physical = False - - # find referenced tables - referenced_tables = extract_table_references(target.sql, dialect_class.name) - tables = load_or_create_tables( - session, - target.database_id, - target.schema, - referenced_tables, - conditional_quote, - ) +def copy_datasets(session: Session) -> None: + """Copy all datasets""" + count = session.query(SqlaTable).count() + print(f">> Copy {count:,} SqlaTable to `sl_datasets`...") + insert_from_select( + "sl_datasets", + select( + [ + # keep the ids the same for easier migration of relationships + SqlaTable.id, + SqlaTable.uuid, + SqlaTable.created_on, + SqlaTable.changed_on, + SqlaTable.id.label("sqlatable_id"), + SqlaTable.table_name.label("name"), + func.coalesce(SqlaTable.sql, SqlaTable.table_name).label("expression"), + is_physical_table.label("is_physical"), + SqlaTable.is_managed_externally, + SqlaTable.external_url, + SqlaTable.extra.label("extra_json"), + ] + ), + ) - # create the new dataset - dataset = NewDataset( - sqlatable_id=target.id, - name=target.table_name, - expression=target.sql or conditional_quote(target.table_name), - tables=tables, - columns=columns, - is_physical=not target.sql, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, + print(" Link physical datasets with tables...") + # Physical datasets (tables) have the same dataset.id and table.id + # as both are from SqlaTable.id + insert_from_select( + "sl_dataset_tables", + select( + [ + NewTable.id.label("dataset_id"), + NewTable.id.label("table_id"), + ] + ), ) - session.add(dataset) -def upgrade(): - # Create tables for the new models. - op.create_table( +def copy_columns(session: Session) -> None: + """Copy columns with active associated SqlTable""" + count = session.query(TableColumn).select_from(active_table_columns).count() + print(f">> Copy {count:,} active table columns to `sl_columns`...") + insert_from_select( "sl_columns", - # AuditMixinNullable - sa.Column("created_on", sa.DateTime(), nullable=True), - sa.Column("changed_on", sa.DateTime(), nullable=True), - sa.Column("created_by_fk", sa.Integer(), nullable=True), - sa.Column("changed_by_fk", sa.Integer(), nullable=True), - # ExtraJSONMixin - sa.Column("extra_json", sa.Text(), nullable=True), - # ImportExportMixin - sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4), - # Column - sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False), - sa.Column("name", sa.TEXT(), nullable=False), - sa.Column("type", sa.TEXT(), nullable=False), - sa.Column("expression", sa.TEXT(), nullable=False), - sa.Column( - "is_physical", - sa.BOOLEAN(), - nullable=False, - default=True, - ), - sa.Column("description", sa.TEXT(), nullable=True), - sa.Column("warning_text", sa.TEXT(), nullable=True), - sa.Column("unit", sa.TEXT(), nullable=True), - sa.Column("is_temporal", sa.BOOLEAN(), nullable=False), - sa.Column( - "is_spatial", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_partition", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_aggregation", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_additive", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_increase_desired", - sa.BOOLEAN(), - nullable=False, - default=True, - ), - sa.Column( - "is_managed_externally", - sa.Boolean(), - nullable=False, - server_default=sa.false(), - ), - sa.Column("external_url", sa.Text(), nullable=True), - sa.PrimaryKeyConstraint("id"), + select( + [ + # keep the same column.id so later relationships can be added easier + TableColumn.id, + TableColumn.uuid, + TableColumn.created_on, + TableColumn.changed_on, + TableColumn.column_name.label("name"), + TableColumn.description, + func.coalesce(TableColumn.expression, TableColumn.column_name).label( + "expression" + ), + sa.literal(False).label("is_aggregation"), + (TableColumn.expression.is_(None) | TableColumn.expression == "").label( + "is_physical" + ), + TableColumn.is_dttm.label("is_temporal"), + func.coalesce(TableColumn.type, "Unknown").label("type"), + TableColumn.extra.label("extra_json"), + ] + ).select_from(active_table_columns), ) - with op.batch_alter_table("sl_columns") as batch_op: - batch_op.create_unique_constraint("uq_sl_columns_uuid", ["uuid"]) - op.create_table( - "sl_tables", - # AuditMixinNullable - sa.Column("created_on", sa.DateTime(), nullable=True), - sa.Column("changed_on", sa.DateTime(), nullable=True), - sa.Column("created_by_fk", sa.Integer(), nullable=True), - sa.Column("changed_by_fk", sa.Integer(), nullable=True), - # ExtraJSONMixin - sa.Column("extra_json", sa.Text(), nullable=True), - # ImportExportMixin - sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4), - # Table - sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False), - sa.Column("database_id", sa.INTEGER(), autoincrement=False, nullable=False), - sa.Column("catalog", sa.TEXT(), nullable=True), - sa.Column("schema", sa.TEXT(), nullable=True), - sa.Column("name", sa.TEXT(), nullable=False), - sa.Column( - "is_managed_externally", - sa.Boolean(), - nullable=False, - server_default=sa.false(), - ), - sa.Column("external_url", sa.Text(), nullable=True), - sa.ForeignKeyConstraint(["database_id"], ["dbs.id"], name="sl_tables_ibfk_1"), - sa.PrimaryKeyConstraint("id"), + print(" Link physical table columns to `sl_tables`...") + insert_from_select( + "sl_table_columns", + select( + [ + TableColumn.table_id, + TableColumn.id.label("column_id"), + ] Review comment: Since table and column ids are the same, we can just fill the association table with ids from the original data. ########## File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py ########## @@ -53,6 +61,31 @@ DB_CONNECTION_MUTATOR = app.config["DB_CONNECTION_MUTATOR"] +class AuxiliaryColumnsMixin: + """ + Auxiliary columns, a combination of columns added by + AuditMixin + ImportExportMixin + """ + + created_on = sa.Column(sa.DateTime, default=datetime.now, nullable=True) + changed_on = sa.Column( + sa.DateTime, default=datetime.now, onupdate=datetime.now, nullable=True + ) + uuid = sa.Column( + UUIDType(binary=True), primary_key=False, unique=True, default=uuid4 + ) Review comment: Previous migration does not have these columns in the new tables, which resulted them having `null` values. ########## File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py ########## @@ -207,427 +241,492 @@ class NewTable(Base): columns: List[NewColumn] = relationship( "NewColumn", secondary=table_column_association_table, cascade="all, delete" ) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) -class NewDataset(Base): +class NewDataset(Base, AuxiliaryColumnsMixin): __tablename__ = "sl_datasets" id = sa.Column(sa.Integer, primary_key=True) sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True) name = sa.Column(sa.Text) - expression = sa.Column(sa.Text) + expression = sa.Column(MediumText()) + is_physical = sa.Column(sa.Boolean, default=False) + is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) + external_url = sa.Column(sa.Text, nullable=True) + extra_json = sa.Column(sa.Text, default="{}") tables: List[NewTable] = relationship( "NewTable", secondary=dataset_table_association_table ) columns: List[NewColumn] = relationship( "NewColumn", secondary=dataset_column_association_table, cascade="all, delete" ) - is_physical = sa.Column(sa.Boolean, default=False) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"} -def load_or_create_tables( +def find_tables( session: Session, database_id: int, default_schema: Optional[str], tables: Set[Table], - conditional_quote: Callable[[str], str], -) -> List[NewTable]: +) -> List[int]: """ - Load or create new table model instances. + Look for NewTable's of from a specific database """ if not tables: return [] - # set the default schema in tables that don't have it - if default_schema: - tables = list(tables) - for i, table in enumerate(tables): - if table.schema is None: - tables[i] = Table(table.table, default_schema, table.catalog) - - # load existing tables predicate = or_( *[ and_( NewTable.database_id == database_id, - NewTable.schema == table.schema, + NewTable.schema == (table.schema or default_schema), NewTable.name == table.table, ) for table in tables ] ) - new_tables = session.query(NewTable).filter(predicate).all() - - # use original database model to get the engine - engine = ( - session.query(OriginalDatabase) - .filter_by(id=database_id) - .one() - .get_sqla_engine(default_schema) - ) - inspector = inspect(engine) - - # add missing tables - existing = {(table.schema, table.name) for table in new_tables} - for table in tables: - if (table.schema, table.table) not in existing: - column_metadata = inspector.get_columns(table.table, schema=table.schema) - columns = [ - NewColumn( - name=column["name"], - type=str(column["type"]), - expression=conditional_quote(column["name"]), - is_temporal=column["type"].python_type.__name__.upper() - in TEMPORAL_TYPES, - is_aggregation=False, - is_physical=True, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - ) - for column in column_metadata - ] - new_tables.append( - NewTable( - name=table.table, - schema=table.schema, - catalog=None, - database_id=database_id, - columns=columns, - ) - ) - existing.add((table.schema, table.table)) + return session.query(NewTable.id).filter(predicate).all() - return new_tables +# helper SQLA elements for easier querying +is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "") -def after_insert(target: SqlaTable) -> None: # pylint: disable=too-many-locals - """ - Copy old datasets to the new models. - """ - session = inspect(target).session +# filtering out columns and metrics with valid associated SqlTable +active_table_columns = sa.join( + TableColumn, + SqlaTable, + and_( + TableColumn.table_id == SqlaTable.id, + TableColumn.is_active, + ), +) +active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == SqlaTable.id) - # get DB-specific conditional quoter for expressions that point to columns or - # table names - database = ( - target.database - or session.query(Database).filter_by(id=target.database_id).first() - ) - if not database: - return - url = make_url(database.sqlalchemy_uri) - dialect_class = url.get_dialect() - conditional_quote = dialect_class().identifier_preparer.quote - - # create columns - columns = [] - for column in target.columns: - # ``is_active`` might be ``None`` at this point, but it defaults to ``True``. - if column.is_active is False: - continue - - try: - extra_json = json.loads(column.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}: - value = getattr(column, attr) - if value: - extra_json[attr] = value - - columns.append( - NewColumn( - name=column.column_name, - type=column.type or "Unknown", - expression=column.expression or conditional_quote(column.column_name), - description=column.description, - is_temporal=column.is_dttm, - is_aggregation=False, - is_physical=column.expression is None or column.expression == "", - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - # create metrics - for metric in target.metrics: - try: - extra_json = json.loads(metric.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"verbose_name", "metric_type", "d3format"}: - value = getattr(metric, attr) - if value: - extra_json[attr] = value - - is_additive = ( - metric.metric_type and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES +def copy_tables(session: Session) -> None: + """Copy Physical tables""" + count = session.query(SqlaTable).filter(is_physical_table).count() + print(f">> Copy {count:,} physical tables to `sl_tables`...") + insert_from_select( + "sl_tables", + select( + [ + SqlaTable.id, + SqlaTable.uuid, + SqlaTable.created_on, + SqlaTable.changed_on, + SqlaTable.table_name.label("name"), + SqlaTable.schema, + SqlaTable.database_id, + SqlaTable.is_managed_externally, + SqlaTable.external_url, + ] ) + # use an inner join to filter out only tables with valid database ids + .select_from( + sa.join(SqlaTable, Database, SqlaTable.database_id == Database.id) + ).where(is_physical_table), + ) - columns.append( - NewColumn( - name=metric.metric_name, - type="Unknown", # figuring this out would require a type inferrer - expression=metric.expression, - warning_text=metric.warning_text, - description=metric.description, - is_aggregation=True, - is_additive=is_additive, - is_physical=False, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - # physical dataset - if not target.sql: - physical_columns = [column for column in columns if column.is_physical] - - # create table - table = NewTable( - name=target.table_name, - schema=target.schema, - catalog=None, # currently not supported - database_id=target.database_id, - columns=physical_columns, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - tables = [table] - - # virtual dataset - else: - # mark all columns as virtual (not physical) - for column in columns: - column.is_physical = False - - # find referenced tables - referenced_tables = extract_table_references(target.sql, dialect_class.name) - tables = load_or_create_tables( - session, - target.database_id, - target.schema, - referenced_tables, - conditional_quote, - ) +def copy_datasets(session: Session) -> None: + """Copy all datasets""" + count = session.query(SqlaTable).count() + print(f">> Copy {count:,} SqlaTable to `sl_datasets`...") + insert_from_select( + "sl_datasets", + select( + [ + # keep the ids the same for easier migration of relationships + SqlaTable.id, + SqlaTable.uuid, + SqlaTable.created_on, + SqlaTable.changed_on, + SqlaTable.id.label("sqlatable_id"), + SqlaTable.table_name.label("name"), + func.coalesce(SqlaTable.sql, SqlaTable.table_name).label("expression"), Review comment: table_name quotes will be added later. ########## File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py ########## @@ -207,427 +241,481 @@ class NewTable(Base): columns: List[NewColumn] = relationship( "NewColumn", secondary=table_column_association_table, cascade="all, delete" ) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) -class NewDataset(Base): +class NewDataset(Base, AuxiliaryColumnsMixin): __tablename__ = "sl_datasets" id = sa.Column(sa.Integer, primary_key=True) sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True) name = sa.Column(sa.Text) - expression = sa.Column(sa.Text) + expression = sa.Column(MediumText()) + is_physical = sa.Column(sa.Boolean, default=False) + is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) + external_url = sa.Column(sa.Text, nullable=True) + extra_json = sa.Column(sa.Text, default="{}") tables: List[NewTable] = relationship( "NewTable", secondary=dataset_table_association_table ) columns: List[NewColumn] = relationship( "NewColumn", secondary=dataset_column_association_table, cascade="all, delete" ) - is_physical = sa.Column(sa.Boolean, default=False) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"} -def load_or_create_tables( +def find_tables( session: Session, database_id: int, default_schema: Optional[str], tables: Set[Table], - conditional_quote: Callable[[str], str], -) -> List[NewTable]: +) -> List[int]: """ - Load or create new table model instances. + Look for NewTable's of from a specific database """ if not tables: return [] - # set the default schema in tables that don't have it - if default_schema: - tables = list(tables) - for i, table in enumerate(tables): - if table.schema is None: - tables[i] = Table(table.table, default_schema, table.catalog) - - # load existing tables predicate = or_( *[ and_( NewTable.database_id == database_id, - NewTable.schema == table.schema, + NewTable.schema == (table.schema or default_schema), NewTable.name == table.table, ) for table in tables ] ) - new_tables = session.query(NewTable).filter(predicate).all() - - # use original database model to get the engine - engine = ( - session.query(OriginalDatabase) - .filter_by(id=database_id) - .one() - .get_sqla_engine(default_schema) - ) - inspector = inspect(engine) - - # add missing tables - existing = {(table.schema, table.name) for table in new_tables} - for table in tables: - if (table.schema, table.table) not in existing: - column_metadata = inspector.get_columns(table.table, schema=table.schema) - columns = [ - NewColumn( - name=column["name"], - type=str(column["type"]), - expression=conditional_quote(column["name"]), - is_temporal=column["type"].python_type.__name__.upper() - in TEMPORAL_TYPES, - is_aggregation=False, - is_physical=True, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - ) - for column in column_metadata - ] - new_tables.append( - NewTable( - name=table.table, - schema=table.schema, - catalog=None, - database_id=database_id, - columns=columns, - ) - ) - existing.add((table.schema, table.table)) + return session.query(NewTable.id).filter(predicate).all() - return new_tables +# helper SQLA elements for easier querying +is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "") -def after_insert(target: SqlaTable) -> None: # pylint: disable=too-many-locals - """ - Copy old datasets to the new models. - """ - session = inspect(target).session +# filtering out columns and metrics with valid associated SqlTable +active_table_columns = sa.join( + TableColumn, + SqlaTable, + and_( + TableColumn.table_id == SqlaTable.id, + TableColumn.is_active, + ), +) +active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == SqlaTable.id) - # get DB-specific conditional quoter for expressions that point to columns or - # table names - database = ( - target.database - or session.query(Database).filter_by(id=target.database_id).first() - ) - if not database: - return - url = make_url(database.sqlalchemy_uri) - dialect_class = url.get_dialect() - conditional_quote = dialect_class().identifier_preparer.quote - - # create columns - columns = [] - for column in target.columns: - # ``is_active`` might be ``None`` at this point, but it defaults to ``True``. - if column.is_active is False: - continue - - try: - extra_json = json.loads(column.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}: - value = getattr(column, attr) - if value: - extra_json[attr] = value - - columns.append( - NewColumn( - name=column.column_name, - type=column.type or "Unknown", - expression=column.expression or conditional_quote(column.column_name), - description=column.description, - is_temporal=column.is_dttm, - is_aggregation=False, - is_physical=column.expression is None or column.expression == "", - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - # create metrics - for metric in target.metrics: - try: - extra_json = json.loads(metric.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"verbose_name", "metric_type", "d3format"}: - value = getattr(metric, attr) - if value: - extra_json[attr] = value - - is_additive = ( - metric.metric_type and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES +def copy_tables(session: Session) -> None: + """Copy Physical tables""" + count = session.query(SqlaTable).filter(is_physical_table).count() + print(f">> Copy {count:,} physical tables to `sl_tables`...") + insert_from_select( + "sl_tables", + select( + [ + SqlaTable.id, + SqlaTable.uuid, + SqlaTable.created_on, + SqlaTable.changed_on, + SqlaTable.table_name.label("name"), + SqlaTable.schema, + SqlaTable.database_id, + SqlaTable.is_managed_externally, + SqlaTable.external_url, + ] ) + # use an inner join to filter out only tables with valid database ids + .select_from( + sa.join(SqlaTable, Database, SqlaTable.database_id == Database.id) + ).where(is_physical_table), + ) - columns.append( - NewColumn( - name=metric.metric_name, - type="Unknown", # figuring this out would require a type inferrer - expression=metric.expression, - warning_text=metric.warning_text, - description=metric.description, - is_aggregation=True, - is_additive=is_additive, - is_physical=False, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - # physical dataset - if not target.sql: - physical_columns = [column for column in columns if column.is_physical] - - # create table - table = NewTable( - name=target.table_name, - schema=target.schema, - catalog=None, # currently not supported - database_id=target.database_id, - columns=physical_columns, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - tables = [table] - - # virtual dataset - else: - # mark all columns as virtual (not physical) - for column in columns: - column.is_physical = False - - # find referenced tables - referenced_tables = extract_table_references(target.sql, dialect_class.name) - tables = load_or_create_tables( - session, - target.database_id, - target.schema, - referenced_tables, - conditional_quote, - ) +def copy_datasets(session: Session) -> None: + """Copy all datasets""" + count = session.query(SqlaTable).count() + print(f">> Copy {count:,} SqlaTable to `sl_datasets`...") + insert_from_select( + "sl_datasets", + select( + [ + # keep the ids the same for easier migration of relationships + SqlaTable.id, + SqlaTable.uuid, + SqlaTable.created_on, + SqlaTable.changed_on, + SqlaTable.id.label("sqlatable_id"), + SqlaTable.table_name.label("name"), + func.coalesce(SqlaTable.sql, SqlaTable.table_name).label("expression"), + is_physical_table.label("is_physical"), + SqlaTable.is_managed_externally, + SqlaTable.external_url, + SqlaTable.extra.label("extra_json"), + ] + ), + ) - # create the new dataset - dataset = NewDataset( - sqlatable_id=target.id, - name=target.table_name, - expression=target.sql or conditional_quote(target.table_name), - tables=tables, - columns=columns, - is_physical=not target.sql, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, + print(" Link physical datasets with tables...") + # Physical datasets (tables) have the same dataset.id and table.id + # as both are from SqlaTable.id + insert_from_select( + "sl_dataset_tables", + select( + [ + NewTable.id.label("dataset_id"), + NewTable.id.label("table_id"), + ] + ), ) - session.add(dataset) -def upgrade(): - # Create tables for the new models. - op.create_table( Review comment: The manual duplicate specification of these `create_table` command is not needed anymore. Tables are now created with `Base.metadata.create_all(bind=bind, tables=new_tables)`. ########## File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py ########## @@ -150,55 +180,66 @@ def fetch_columns_and_metrics(self, session: Session) -> None: Base.metadata, sa.Column("table_id", sa.ForeignKey("sl_tables.id")), sa.Column("column_id", sa.ForeignKey("sl_columns.id")), + UniqueConstraint("table_id", "column_id"), ) dataset_column_association_table = sa.Table( "sl_dataset_columns", Base.metadata, sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id")), sa.Column("column_id", sa.ForeignKey("sl_columns.id")), + UniqueConstraint("dataset_id", "column_id"), ) dataset_table_association_table = sa.Table( "sl_dataset_tables", Base.metadata, sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id")), sa.Column("table_id", sa.ForeignKey("sl_tables.id")), + UniqueConstraint("dataset_id", "table_id"), ) -class NewColumn(Base): +class NewColumn(Base, AuxiliaryColumnsMixin): __tablename__ = "sl_columns" id = sa.Column(sa.Integer, primary_key=True) name = sa.Column(sa.Text) type = sa.Column(sa.Text) - expression = sa.Column(sa.Text) + expression = sa.Column(MediumText()) + + # TODO: jesseyang + # this should probably be nullable=False and default=False + # do a migration later is_physical = sa.Column(sa.Boolean, default=True) - description = sa.Column(sa.Text) - warning_text = sa.Column(sa.Text) + + description = sa.Column(MediumText()) + warning_text = sa.Column(MediumText()) + unit = sa.Column(sa.Text) Review comment: @betodealmeida `unit` is in [superset.columns.models](https://github.com/apache/superset/blob/a619cb4ea98342a2fdf7f77587d8aa078c7dccef/superset/columns/models.py#L75) but not in the migration script. Should I keep this or not? ########## File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py ########## @@ -207,427 +241,481 @@ class NewTable(Base): columns: List[NewColumn] = relationship( "NewColumn", secondary=table_column_association_table, cascade="all, delete" ) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) -class NewDataset(Base): +class NewDataset(Base, AuxiliaryColumnsMixin): __tablename__ = "sl_datasets" id = sa.Column(sa.Integer, primary_key=True) sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True) name = sa.Column(sa.Text) - expression = sa.Column(sa.Text) + expression = sa.Column(MediumText()) + is_physical = sa.Column(sa.Boolean, default=False) + is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) + external_url = sa.Column(sa.Text, nullable=True) + extra_json = sa.Column(sa.Text, default="{}") tables: List[NewTable] = relationship( "NewTable", secondary=dataset_table_association_table ) columns: List[NewColumn] = relationship( "NewColumn", secondary=dataset_column_association_table, cascade="all, delete" ) - is_physical = sa.Column(sa.Boolean, default=False) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"} -def load_or_create_tables( +def find_tables( session: Session, database_id: int, default_schema: Optional[str], tables: Set[Table], - conditional_quote: Callable[[str], str], -) -> List[NewTable]: +) -> List[int]: """ - Load or create new table model instances. + Look for NewTable's of from a specific database """ if not tables: return [] - # set the default schema in tables that don't have it - if default_schema: - tables = list(tables) - for i, table in enumerate(tables): - if table.schema is None: - tables[i] = Table(table.table, default_schema, table.catalog) - - # load existing tables predicate = or_( *[ and_( NewTable.database_id == database_id, - NewTable.schema == table.schema, + NewTable.schema == (table.schema or default_schema), NewTable.name == table.table, ) for table in tables ] ) - new_tables = session.query(NewTable).filter(predicate).all() - - # use original database model to get the engine - engine = ( - session.query(OriginalDatabase) - .filter_by(id=database_id) - .one() - .get_sqla_engine(default_schema) - ) - inspector = inspect(engine) - - # add missing tables - existing = {(table.schema, table.name) for table in new_tables} - for table in tables: - if (table.schema, table.table) not in existing: - column_metadata = inspector.get_columns(table.table, schema=table.schema) - columns = [ - NewColumn( - name=column["name"], - type=str(column["type"]), - expression=conditional_quote(column["name"]), - is_temporal=column["type"].python_type.__name__.upper() - in TEMPORAL_TYPES, - is_aggregation=False, - is_physical=True, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - ) - for column in column_metadata - ] - new_tables.append( - NewTable( - name=table.table, - schema=table.schema, - catalog=None, - database_id=database_id, - columns=columns, - ) - ) - existing.add((table.schema, table.table)) + return session.query(NewTable.id).filter(predicate).all() - return new_tables +# helper SQLA elements for easier querying +is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "") -def after_insert(target: SqlaTable) -> None: # pylint: disable=too-many-locals - """ - Copy old datasets to the new models. - """ - session = inspect(target).session +# filtering out columns and metrics with valid associated SqlTable +active_table_columns = sa.join( + TableColumn, + SqlaTable, + and_( + TableColumn.table_id == SqlaTable.id, + TableColumn.is_active, + ), +) +active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == SqlaTable.id) - # get DB-specific conditional quoter for expressions that point to columns or - # table names - database = ( - target.database - or session.query(Database).filter_by(id=target.database_id).first() - ) - if not database: - return - url = make_url(database.sqlalchemy_uri) - dialect_class = url.get_dialect() - conditional_quote = dialect_class().identifier_preparer.quote - - # create columns - columns = [] - for column in target.columns: - # ``is_active`` might be ``None`` at this point, but it defaults to ``True``. - if column.is_active is False: - continue - - try: - extra_json = json.loads(column.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}: - value = getattr(column, attr) - if value: - extra_json[attr] = value - - columns.append( - NewColumn( - name=column.column_name, - type=column.type or "Unknown", - expression=column.expression or conditional_quote(column.column_name), - description=column.description, - is_temporal=column.is_dttm, - is_aggregation=False, - is_physical=column.expression is None or column.expression == "", - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - # create metrics - for metric in target.metrics: - try: - extra_json = json.loads(metric.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"verbose_name", "metric_type", "d3format"}: - value = getattr(metric, attr) - if value: - extra_json[attr] = value - - is_additive = ( - metric.metric_type and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES +def copy_tables(session: Session) -> None: + """Copy Physical tables""" + count = session.query(SqlaTable).filter(is_physical_table).count() + print(f">> Copy {count:,} physical tables to `sl_tables`...") + insert_from_select( + "sl_tables", + select( + [ + SqlaTable.id, + SqlaTable.uuid, + SqlaTable.created_on, + SqlaTable.changed_on, + SqlaTable.table_name.label("name"), + SqlaTable.schema, + SqlaTable.database_id, + SqlaTable.is_managed_externally, + SqlaTable.external_url, + ] ) + # use an inner join to filter out only tables with valid database ids + .select_from( + sa.join(SqlaTable, Database, SqlaTable.database_id == Database.id) + ).where(is_physical_table), + ) - columns.append( - NewColumn( - name=metric.metric_name, - type="Unknown", # figuring this out would require a type inferrer - expression=metric.expression, - warning_text=metric.warning_text, - description=metric.description, - is_aggregation=True, - is_additive=is_additive, - is_physical=False, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - # physical dataset - if not target.sql: - physical_columns = [column for column in columns if column.is_physical] - - # create table - table = NewTable( - name=target.table_name, - schema=target.schema, - catalog=None, # currently not supported - database_id=target.database_id, - columns=physical_columns, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - tables = [table] - - # virtual dataset - else: - # mark all columns as virtual (not physical) - for column in columns: - column.is_physical = False - - # find referenced tables - referenced_tables = extract_table_references(target.sql, dialect_class.name) - tables = load_or_create_tables( - session, - target.database_id, - target.schema, - referenced_tables, - conditional_quote, - ) +def copy_datasets(session: Session) -> None: + """Copy all datasets""" + count = session.query(SqlaTable).count() + print(f">> Copy {count:,} SqlaTable to `sl_datasets`...") + insert_from_select( + "sl_datasets", + select( + [ + # keep the ids the same for easier migration of relationships + SqlaTable.id, + SqlaTable.uuid, + SqlaTable.created_on, + SqlaTable.changed_on, + SqlaTable.id.label("sqlatable_id"), + SqlaTable.table_name.label("name"), + func.coalesce(SqlaTable.sql, SqlaTable.table_name).label("expression"), + is_physical_table.label("is_physical"), + SqlaTable.is_managed_externally, + SqlaTable.external_url, + SqlaTable.extra.label("extra_json"), + ] + ), + ) - # create the new dataset - dataset = NewDataset( - sqlatable_id=target.id, - name=target.table_name, - expression=target.sql or conditional_quote(target.table_name), - tables=tables, - columns=columns, - is_physical=not target.sql, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, + print(" Link physical datasets with tables...") + # Physical datasets (tables) have the same dataset.id and table.id + # as both are from SqlaTable.id + insert_from_select( + "sl_dataset_tables", + select( + [ + NewTable.id.label("dataset_id"), + NewTable.id.label("table_id"), + ] + ), ) - session.add(dataset) -def upgrade(): - # Create tables for the new models. - op.create_table( +def copy_columns(session: Session) -> None: + """Copy columns with active associated SqlTable""" + count = session.query(TableColumn).select_from(active_table_columns).count() + print(f">> Copy {count:,} active table columns to `sl_columns`...") + insert_from_select( "sl_columns", - # AuditMixinNullable - sa.Column("created_on", sa.DateTime(), nullable=True), - sa.Column("changed_on", sa.DateTime(), nullable=True), - sa.Column("created_by_fk", sa.Integer(), nullable=True), - sa.Column("changed_by_fk", sa.Integer(), nullable=True), - # ExtraJSONMixin - sa.Column("extra_json", sa.Text(), nullable=True), - # ImportExportMixin - sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4), - # Column - sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False), - sa.Column("name", sa.TEXT(), nullable=False), - sa.Column("type", sa.TEXT(), nullable=False), - sa.Column("expression", sa.TEXT(), nullable=False), - sa.Column( - "is_physical", - sa.BOOLEAN(), - nullable=False, - default=True, - ), - sa.Column("description", sa.TEXT(), nullable=True), - sa.Column("warning_text", sa.TEXT(), nullable=True), - sa.Column("unit", sa.TEXT(), nullable=True), - sa.Column("is_temporal", sa.BOOLEAN(), nullable=False), - sa.Column( - "is_spatial", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_partition", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_aggregation", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_additive", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_increase_desired", - sa.BOOLEAN(), - nullable=False, - default=True, - ), - sa.Column( - "is_managed_externally", - sa.Boolean(), - nullable=False, - server_default=sa.false(), - ), - sa.Column("external_url", sa.Text(), nullable=True), - sa.PrimaryKeyConstraint("id"), + select( + [ + # keep the same column.id so later relationships can be added easier + TableColumn.id, + TableColumn.uuid, + TableColumn.created_on, + TableColumn.changed_on, Review comment: I'm porting over the same `id`, `uuid`, `create_on`, `changed_on` from the original tables so relationship mapping can be easier. As the new tables are intended to fully replace the original tables, retaining these info would also be useful for end user experience (especially `changed_on` and `created_on`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@superset.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@superset.apache.org For additional commands, e-mail: notifications-h...@superset.apache.org