ktmud commented on code in PR #19421:
URL: https://github.com/apache/superset/pull/19421#discussion_r845905804


##########
superset/connectors/sqla/models.py:
##########
@@ -1895,105 +2018,77 @@ def before_update(
         ):
             raise Exception(get_dataset_exist_error_msg(target.full_name))
 
+    def get_sl_columns(self, existing: List[Column] = []) -> List[NewColumn]:
+        """
+        Convert `SqlaTable.columns` and `SqlaTable.metrics` to the new Column 
model
+        """
+        known_columns = {column.uuid: column for column in existing}
+        return [
+            item.to_sl_column(known_columns) for item in self.columns + 
self.metrics
+        ]
+
     @staticmethod
     def update_table(  # pylint: disable=unused-argument
         mapper: Mapper, connection: Connection, target: Union[SqlMetric, 
TableColumn]
     ) -> None:
         """
-        Forces an update to the table's changed_on value when a metric or 
column on the
-        table is updated. This busts the cache key for all charts that use the 
table.
-
         :param mapper: Unused.
         :param connection: Unused.
         :param target: The metric or column that was updated.
         """
         inspector = inspect(target)
         session = inspector.session
 
-        # get DB-specific conditional quoter for expressions that point to 
columns or
-        # table names
-        database = (
-            target.table.database
-            or session.query(Database).filter_by(id=target.database_id).one()
+        # Forces an update to the table's changed_on value when a metric or 
column on the
+        # table is updated. This busts the cache key for all charts that use 
the table.
+        values = dict(changed_on=target.changed_on, 
changed_by_fk=target.changed_by_fk)
+        session.execute(
+            update(SqlaTable).where(SqlaTable.id == 
target.table.id).values(**values)

Review Comment:
   Make the update more explicit, otherwise SQLA will SELECT the full entity 
first then SET every property to the same value.



##########
superset/connectors/sqla/models.py:
##########
@@ -1745,13 +1859,16 @@ def fetch_metadata(self, commit: bool = True) -> 
MetadataResult:
         )
 
         # clear old columns before adding modified columns back
-        self.columns = []
+        columns = []
         for col in new_columns:
             old_column = old_columns_by_name.pop(col["name"], None)
             if not old_column:
                 results.added.append(col["name"])
                 new_column = TableColumn(
-                    column_name=col["name"], type=col["type"], table=self
+                    column_name=col["name"],
+                    type=col["type"],
+                    table_id=self.id,
+                    table=self,

Review Comment:
   Assigning `table=self` when creating the `new_column` 
(`TableColumn(table=self)`) will add the `new_column` to `self.columns` 
already---because SQLA magically knows how to update the reference. So 
`self.columns.append(..)` later actually added the column twice, i.e. 
`self.columns == [ds, ds, state, state, ...]`---this does not cause a problem 
in production because the duplicate columns are just the same instance of one 
column, but it will cause issues later when we loop through 
`sqla_table.columns` to generate `NewColumn`.
   
   The fix is to keep `columns` detached from `self` before all columns are 
added to the list.



##########
superset/migrations/shared/utils.py:
##########
@@ -73,42 +52,44 @@ def table_has_column(table: str, column: str) -> bool:
         return False
 
 
-def find_nodes_by_key(element: Any, target: str) -> Iterator[Any]:
-    """
-    Find all nodes in a SQL tree matching a given key.
-    """
-    if isinstance(element, list):
-        for child in element:
-            yield from find_nodes_by_key(child, target)
-    elif isinstance(element, dict):
-        for key, value in element.items():
-            if key == target:
-                yield value
-            else:
-                yield from find_nodes_by_key(value, target)
-
-
-def extract_table_references(sql_text: str, sqla_dialect: str) -> Set[Table]:

Review Comment:
   This is moved to `superset.sql_parse`



##########
tests/unit_tests/datasets/test_models.py:
##########
@@ -185,7 +190,7 @@ def test_dataset_attributes(app_context: None, session: 
Session) -> None:
 
     columns = [
         TableColumn(column_name="ds", is_dttm=1, type="TIMESTAMP"),
-        TableColumn(column_name="user_id", type="INTEGER"),

Review Comment:
   Change column name to avoid confusion. `user_id` looks too much like native 
Superset metadata columns.



##########
superset/connectors/sqla/models.py:
##########
@@ -2063,172 +2146,72 @@ def after_update(  # pylint: 
disable=too-many-branches, too-many-locals, too-man
 
         For more context: https://github.com/apache/superset/issues/14909
         """
-        inspector = inspect(target)
+        # set permissions
+        security_manager.set_perm(mapper, connection, sqla_table)
+
+        inspector = inspect(sqla_table)
         session = inspector.session
 
         # double-check that ``UPDATE``s are actually pending (this method is 
called even
         # for instances that have no net changes to their column-based 
attributes)
-        if not session.is_modified(target, include_collections=True):
+        if not session.is_modified(sqla_table, include_collections=True):
             return
 
-        # set permissions
-        security_manager.set_perm(mapper, connection, target)
-
         dataset = (
-            
session.query(NewDataset).filter_by(sqlatable_id=target.id).one_or_none()
+            
session.query(NewDataset).filter_by(uuid=sqla_table.uuid).one_or_none()
         )
         if not dataset:
+            sqla_table.write_shadow_dataset()
             return
 
-        # get DB-specific conditional quoter for expressions that point to 
columns or
-        # table names
-        database = (
-            target.database
-            or session.query(Database).filter_by(id=target.database_id).one()
-        )
-        engine = database.get_sqla_engine(schema=target.schema)
-        conditional_quote = engine.dialect.identifier_preparer.quote
-
-        # update columns
-        if inspector.attrs.columns.history.has_changes():
-            # handle deleted columns
-            if inspector.attrs.columns.history.deleted:
-                column_names = {
-                    column.column_name
-                    for column in inspector.attrs.columns.history.deleted
-                }
-                dataset.columns = [
-                    column
-                    for column in dataset.columns
-                    if column.name not in column_names
-                ]
-
-            # handle inserted columns
-            for column in inspector.attrs.columns.history.added:
-                # ``is_active`` might be ``None``, but it defaults to ``True``.
-                if column.is_active is False:
-                    continue
-
-                extra_json = json.loads(column.extra or "{}")
-                for attr in {
-                    "groupby",
-                    "filterable",
-                    "verbose_name",
-                    "python_date_format",
-                }:
-                    value = getattr(column, attr)
-                    if value:
-                        extra_json[attr] = value
-
-                dataset.columns.append(
-                    NewColumn(
-                        name=column.column_name,
-                        type=column.type or "Unknown",
-                        expression=column.expression
-                        or conditional_quote(column.column_name),
-                        description=column.description,
-                        is_temporal=column.is_dttm,
-                        is_aggregation=False,
-                        is_physical=column.expression is None,
-                        is_spatial=False,
-                        is_partition=False,
-                        is_increase_desired=True,
-                        extra_json=json.dumps(extra_json) if extra_json else 
None,
-                        is_managed_externally=target.is_managed_externally,
-                        external_url=target.external_url,
-                    )
-                )
-
-        # update metrics
-        if inspector.attrs.metrics.history.has_changes():
-            # handle deleted metrics
-            if inspector.attrs.metrics.history.deleted:
-                column_names = {
-                    metric.metric_name
-                    for metric in inspector.attrs.metrics.history.deleted
-                }
-                dataset.columns = [
-                    column
-                    for column in dataset.columns
-                    if column.name not in column_names
-                ]
-
-            # handle inserted metrics
-            for metric in inspector.attrs.metrics.history.added:
-                extra_json = json.loads(metric.extra or "{}")
-                for attr in {"verbose_name", "metric_type", "d3format"}:
-                    value = getattr(metric, attr)
-                    if value:
-                        extra_json[attr] = value
-
-                is_additive = (
-                    metric.metric_type
-                    and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES
-                )
-
-                dataset.columns.append(
-                    NewColumn(
-                        name=metric.metric_name,
-                        type="Unknown",
-                        expression=metric.expression,
-                        warning_text=metric.warning_text,
-                        description=metric.description,
-                        is_aggregation=True,
-                        is_additive=is_additive,
-                        is_physical=False,
-                        is_spatial=False,
-                        is_partition=False,
-                        is_increase_desired=True,
-                        extra_json=json.dumps(extra_json) if extra_json else 
None,
-                        is_managed_externally=target.is_managed_externally,
-                        external_url=target.external_url,
-                    )
-                )
+        # sync column list and delete removed columns
+        if (
+            inspector.attrs.columns.history.has_changes()
+            or inspector.attrs.metrics.history.has_changes()
+        ):
+            # pre-assign uuid after new columns or metrics are inserted so
+            # the related `NewColumn` can have a deterministic uuid, too
+            uuids = []
+            for item in sqla_table.columns + sqla_table.metrics:
+                if not item.uuid:
+                    item.uuid = uuid4()
+                else:
+                    uuids.append(item.uuid)
+            existing_columns = (
+                
session.query(NewColumn).filter(NewColumn.uuid.in_(uuids)).all()
+            )
+            dataset.columns = sqla_table.get_sl_columns(existing_columns)

Review Comment:
   Now we just always re-create or update all NewColumn's as long as there is 
any update to `sqla_table.columns` or `sqal_table.metrics`.



##########
superset/connectors/sqla/models.py:
##########
@@ -1895,105 +2018,77 @@ def before_update(
         ):
             raise Exception(get_dataset_exist_error_msg(target.full_name))
 
+    def get_sl_columns(self, existing: List[Column] = []) -> List[NewColumn]:
+        """
+        Convert `SqlaTable.columns` and `SqlaTable.metrics` to the new Column 
model
+        """
+        known_columns = {column.uuid: column for column in existing}
+        return [
+            item.to_sl_column(known_columns) for item in self.columns + 
self.metrics
+        ]
+
     @staticmethod
     def update_table(  # pylint: disable=unused-argument
         mapper: Mapper, connection: Connection, target: Union[SqlMetric, 
TableColumn]
     ) -> None:
         """
-        Forces an update to the table's changed_on value when a metric or 
column on the
-        table is updated. This busts the cache key for all charts that use the 
table.
-
         :param mapper: Unused.
         :param connection: Unused.
         :param target: The metric or column that was updated.
         """
         inspector = inspect(target)
         session = inspector.session
 
-        # get DB-specific conditional quoter for expressions that point to 
columns or
-        # table names
-        database = (
-            target.table.database
-            or session.query(Database).filter_by(id=target.database_id).one()
+        # Forces an update to the table's changed_on value when a metric or 
column on the
+        # table is updated. This busts the cache key for all charts that use 
the table.
+        values = dict(changed_on=target.changed_on, 
changed_by_fk=target.changed_by_fk)
+        session.execute(
+            update(SqlaTable).where(SqlaTable.id == 
target.table.id).values(**values)
         )
-        engine = database.get_sqla_engine(schema=target.table.schema)
-        conditional_quote = engine.dialect.identifier_preparer.quote
-
-        session.execute(update(SqlaTable).where(SqlaTable.id == 
target.table.id))
-
-        dataset = (
-            session.query(NewDataset)
-            .filter_by(sqlatable_id=target.table.id)
-            .one_or_none()
+        session.execute(
+            update(NewDataset)
+            .where(
+                NewDataset.id.in_(
+                    select([NewDataset.id]).select_from(
+                        sa.join(
+                            NewDataset, SqlaTable, NewDataset.uuid == 
SqlaTable.uuid
+                        )
+                    )
+                )
+            )
+            .values(**values)
         )
 
-        if not dataset:
-            # if dataset is not found create a new copy
-            # of the dataset instead of updating the existing
-
-            SqlaTable.write_shadow_dataset(target.table, database, session)
-            return
-
-        # update ``Column`` model as well
-        if isinstance(target, TableColumn):
-            columns = [
-                column
-                for column in dataset.columns
-                if column.name == target.column_name
-            ]
-            if not columns:
-                return
-
-            column = columns[0]
-            extra_json = json.loads(target.extra or "{}")
-            for attr in {"groupby", "filterable", "verbose_name", 
"python_date_format"}:
-                value = getattr(target, attr)
-                if value:
-                    extra_json[attr] = value
-
-            column.name = target.column_name
-            column.type = target.type or "Unknown"
-            column.expression = target.expression or conditional_quote(
-                target.column_name
+        # if table itself has changed, shadow-writing will happen in 
`after_udpate` anyway
+        if target.table not in session.dirty:

Review Comment:
   Only trigger an update for NewColumn when the table has no update 
itself--otherwise the after_update hook on table will trigger a full update 
anyway.



##########
superset/connectors/sqla/models.py:
##########
@@ -2237,29 +2205,34 @@ def after_update(  # pylint: disable=too-many-branches, 
too-many-locals, too-man
                 column.is_physical = False
 
             # update referenced tables if SQL changed
-            if inspector.attrs.sql.history.has_changes():
-                parsed = ParsedQuery(target.sql)
-                referenced_tables = parsed.tables
-
-                predicate = or_(
-                    *[
-                        and_(
-                            NewTable.schema == (table.schema or target.schema),
-                            NewTable.name == table.table,
-                        )
-                        for table in referenced_tables
-                    ]
+            if sqla_table.sql and inspector.attrs.sql.history.has_changes():
+                referenced_tables = extract_table_references(

Review Comment:
   Table name extraction now uses the same function used in db migration that 
utilizes `sqloxide`.



##########
superset/examples/birth_names.py:
##########
@@ -135,23 +135,26 @@ def _set_table_metadata(datasource: SqlaTable, database: 
"Database") -> None:
 
 
 def _add_table_metrics(datasource: SqlaTable) -> None:
-    if not any(col.column_name == "num_california" for col in 
datasource.columns):
+    # By accessing the attribute first, we make sure `datasource.columns` and
+    # `datasource.metrics` are already loaded. Otherwise accessing them later
+    # may trigger an unnecessary and unexpected `after_update` event.

Review Comment:
   The double-triggering of `after_update` used to break integration tests, 
raising "duplicate pk" error for newly created NewColumn's. I fixed the issue 
in other ways and made double triggering safe but kept this change since it's 
very minor.



##########
tests/integration_tests/commands_test.py:
##########
@@ -58,10 +58,13 @@ def test_is_valid_config(self):
 
 
 class TestImportAssetsCommand(SupersetTestCase):
-    @patch("superset.dashboards.commands.importers.v1.utils.g")
-    def test_import_assets(self, mock_g):

Review Comment:
   `mock_g` is somehow breaking in CI. change to an alternative overriding 
solution.



##########
superset/connectors/sqla/models.py:
##########
@@ -2063,172 +2131,72 @@ def after_update(  # pylint: 
disable=too-many-branches, too-many-locals, too-man
 
         For more context: https://github.com/apache/superset/issues/14909
         """
-        inspector = inspect(target)
+        # set permissions
+        security_manager.set_perm(mapper, connection, sqla_table)
+
+        inspector = inspect(sqla_table)
         session = inspector.session
 
         # double-check that ``UPDATE``s are actually pending (this method is 
called even
         # for instances that have no net changes to their column-based 
attributes)
-        if not session.is_modified(target, include_collections=True):
+        if not session.is_modified(sqla_table, include_collections=True):
             return
 
-        # set permissions
-        security_manager.set_perm(mapper, connection, target)
-
         dataset = (
-            
session.query(NewDataset).filter_by(sqlatable_id=target.id).one_or_none()
+            
session.query(NewDataset).filter_by(uuid=sqla_table.uuid).one_or_none()
         )
         if not dataset:
+            sqla_table.write_shadow_dataset()
             return
 
-        # get DB-specific conditional quoter for expressions that point to 
columns or
-        # table names
-        database = (
-            target.database
-            or session.query(Database).filter_by(id=target.database_id).one()
-        )
-        engine = database.get_sqla_engine(schema=target.schema)
-        conditional_quote = engine.dialect.identifier_preparer.quote
-
-        # update columns
-        if inspector.attrs.columns.history.has_changes():
-            # handle deleted columns
-            if inspector.attrs.columns.history.deleted:
-                column_names = {
-                    column.column_name
-                    for column in inspector.attrs.columns.history.deleted
-                }
-                dataset.columns = [
-                    column
-                    for column in dataset.columns
-                    if column.name not in column_names
-                ]
-
-            # handle inserted columns
-            for column in inspector.attrs.columns.history.added:
-                # ``is_active`` might be ``None``, but it defaults to ``True``.
-                if column.is_active is False:
-                    continue
-
-                extra_json = json.loads(column.extra or "{}")
-                for attr in {
-                    "groupby",
-                    "filterable",
-                    "verbose_name",
-                    "python_date_format",
-                }:
-                    value = getattr(column, attr)
-                    if value:
-                        extra_json[attr] = value
-
-                dataset.columns.append(
-                    NewColumn(
-                        name=column.column_name,
-                        type=column.type or "Unknown",
-                        expression=column.expression
-                        or conditional_quote(column.column_name),
-                        description=column.description,
-                        is_temporal=column.is_dttm,
-                        is_aggregation=False,
-                        is_physical=column.expression is None,
-                        is_spatial=False,
-                        is_partition=False,
-                        is_increase_desired=True,
-                        extra_json=json.dumps(extra_json) if extra_json else 
None,
-                        is_managed_externally=target.is_managed_externally,
-                        external_url=target.external_url,
-                    )

Review Comment:
   This is pulled to a `to_sl_column()` method so we can reuse the same logic 
for both table creation and updates.



##########
superset/migrations/shared/utils.py:
##########
@@ -73,42 +52,44 @@ def table_has_column(table: str, column: str) -> bool:
         return False
 
 
-def find_nodes_by_key(element: Any, target: str) -> Iterator[Any]:
-    """
-    Find all nodes in a SQL tree matching a given key.
-    """
-    if isinstance(element, list):
-        for child in element:
-            yield from find_nodes_by_key(child, target)
-    elif isinstance(element, dict):
-        for key, value in element.items():
-            if key == target:
-                yield value
-            else:
-                yield from find_nodes_by_key(value, target)
-
-
-def extract_table_references(sql_text: str, sqla_dialect: str) -> Set[Table]:
-    """
-    Return all the dependencies from a SQL sql_text.
-    """
-    if not parse_sql:
-        parsed = ParsedQuery(sql_text)
-        return parsed.tables
+uuid_by_dialect = {
+    MySQLDialect: "UNHEX(REPLACE(CONVERT(UUID() using utf8mb4), '-', ''))",
+    PGDialect: "uuid_in(md5(random()::text || 
clock_timestamp()::text)::cstring)",
+}
 
-    dialect = "generic"
-    for dialect, sqla_dialects in sqloxide_dialects.items():
-        if sqla_dialect in sqla_dialects:
-            break
-    try:
-        tree = parse_sql(sql_text, dialect=dialect)
-    except Exception:  # pylint: disable=broad-except
-        logger.warning("Unable to parse query with sqloxide: %s", sql_text)
-        # fallback to sqlparse
-        parsed = ParsedQuery(sql_text)
-        return parsed.tables
 
-    return {
-        Table(*[part["value"] for part in table["name"][::-1]])
-        for table in find_nodes_by_key(tree, "Table")
-    }
+def assign_uuids(

Review Comment:
   Reuse uuid generation from an earlier iteration.



##########
superset/connectors/sqla/models.py:
##########
@@ -417,6 +426,59 @@ def data(self) -> Dict[str, Any]:
 
         return attr_dict
 
+    def to_sl_column(
+        self, known_columns: Optional[Dict[str, NewColumn]] = None
+    ) -> NewColumn:
+        """Convert a TableColumn to NewColumn"""
+        column = known_columns.get(self.uuid) if known_columns else None
+        if not column:
+            column = NewColumn()

Review Comment:
   Create a new column only when we cannot find it from the existing columns 
list. The existing columns are filtered from local session states and the db 
using TableColumn and SqlMetrics uuids.



##########
tests/unit_tests/migrations/shared/utils_test.py:
##########
@@ -1,56 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# pylint: disable=import-outside-toplevel, unused-argument
-
-"""
-Test the SIP-68 migration.
-"""
-
-from pytest_mock import MockerFixture
-
-from superset.sql_parse import Table
-
-
-def test_extract_table_references(mocker: MockerFixture, app_context: None) -> 
None:

Review Comment:
   This is moved to `superset.sql_parse`



##########
tests/integration_tests/sqla_models_tests.py:
##########
@@ -455,7 +455,8 @@ def test_fetch_metadata_for_updated_virtual_table(self):
 
         # make sure the columns have been mapped properly
         assert len(table.columns) == 4
-        table.fetch_metadata()
+        table.fetch_metadata(commit=False)

Review Comment:
   If we do not delete this table, it could violate the unique `(column_name, 
table_id)` constraint if triggered multiple times.



##########
tests/unit_tests/conftest.py:
##########
@@ -31,25 +31,33 @@
 
 
 @pytest.fixture
-def session(mocker: MockFixture) -> Iterator[Session]:
+def get_session(mocker: MockFixture) -> Callable[[], Session]:
     """
     Create an in-memory SQLite session to test models.
     """
     engine = create_engine("sqlite://")
-    Session_ = sessionmaker(bind=engine)  # pylint: disable=invalid-name
-    in_memory_session = Session_()
 
-    # flask calls session.remove()
-    in_memory_session.remove = lambda: None
+    def get_session():
+        Session_ = sessionmaker(bind=engine)  # pylint: disable=invalid-name
+        in_memory_session = Session_()

Review Comment:
   Allow creating a new session to mimic re-querying persisted data.



##########
superset/tables/models.py:
##########
@@ -80,13 +108,95 @@ class Table(Model, AuditMixinNullable, ExtraJSONMixin, 
ImportExportMixin):
     schema = sa.Column(sa.Text)
     name = sa.Column(sa.Text)
 
-    # The relationship between tables and columns is 1:n, but we use a 
many-to-many
-    # association to differentiate between the relationship between datasets 
and
-    # columns.
-    columns: List[Column] = relationship(
-        "Column", secondary=association_table, cascade="all, delete"
-    )
-
     # Column is managed externally and should be read-only inside Superset
     is_managed_externally = sa.Column(sa.Boolean, nullable=False, 
default=False)
     external_url = sa.Column(sa.Text, nullable=True)
+
+    @property
+    def fullname(self) -> str:
+        return str(TableName(table=self.name, schema=self.schema, 
catalog=self.catalog))
+
+    def __repr__(self) -> str:
+        return f"<Table id={self.id} database_id={self.database_id} 
{self.fullname}>"
+
+    def sync_columns(self) -> None:
+        """Sync table columns with the database. Keep metadata for existing 
columns"""
+        try:
+            column_metadata = self.database.get_columns(self.name, self.schema)
+        except Exception:  # pylint: disable=broad-except
+            column_metadata = []
+
+        existing_columns = {column.name: column for column in self.columns}
+        quote_identifier = self.database.quote_identifier
+
+        def update_or_create_column(column_meta: Dict[str, Any]) -> Column:
+            column_name: str = column_meta["name"]
+            column_type: TypeEngine = column_meta["type"]
+            if column_name in existing_columns:
+                column = existing_columns[column_name]
+            else:
+                column = Column(name=column_name)
+            column.type = str(column_type)
+            column.is_temporal = is_column_type_temporal(column_type)
+            column.expression = quote_identifier(column_name)
+            column.is_aggregation = False
+            column.is_physical = True
+            column.is_spatial = False
+            column.is_partition = False  # TODO: update with accurate 
is_partition

Review Comment:
   Only update information fetched from database. Overridable attributes like 
`is_dimensional` and `is_filterable` are left unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to