robdiciuccio commented on a change in pull request #13561:
URL: https://github.com/apache/superset/pull/13561#discussion_r596365137
##########
File path: superset/utils/data.py
##########
@@ -161,5 +227,145 @@ def generate_data(columns: List[ColumnInfo], num_rows:
int) -> List[Dict[str, An
def generate_column_data(column: ColumnInfo, num_rows: int) -> List[Any]:
- func = get_type_generator(column["type"])
- return [func() for _ in range(num_rows)]
+ gen = get_type_generator(column["type"])
+ return [gen() for _ in range(num_rows)]
+
+
+def add_to_model(session: Session, model: Type[Model], count: int) ->
List[Model]:
+ """
+ Add entities of a given model.
+
+ :param Model model: a Superset/FAB model
+ :param int count: how many entities to generate and insert
+ """
+ inspector = inspect(model)
+
+ # select samples to copy relationship values
+ relationships = inspector.relationships.items()
+ samples = session.query(model).limit(count).all() if relationships else []
+
+ entities: List[Model] = []
+ max_primary_key: Optional[int] = None
+ for i in range(count):
+ sample = samples[i % len(samples)] if samples else None
+ kwargs = {}
+ for column in inspector.columns.values():
+ # for primary keys, keep incrementing
+ if column.primary_key:
+ if max_primary_key is None:
+ max_primary_key = (
+ session.query(func.max(getattr(model,
column.name))).scalar()
+ or 0
+ )
+ max_primary_key += 1
+ kwargs[column.name] = max_primary_key
+
+ # if the column has a foreign key, copy the value from an existing
entity
+ elif column.foreign_keys:
+ if sample:
+ kwargs[column.name] = getattr(sample, column.name)
+ else:
+ kwargs[column.name] = get_valid_foreign_key(column)
+
+ # should be an enum but it's not
+ elif column.name == "datasource_type":
+ kwargs[column.name] = "table"
+
+ # otherwise, generate a random value based on the type
+ else:
+ kwargs[column.name] = generate_value(column)
+
+ entities.append(model(**kwargs))
+
+ session.add_all(entities)
+ return entities
+
+
+def get_valid_foreign_key(column: Column) -> Any:
+ foreign_key = list(column.foreign_keys)[0]
+ table_name, column_name = foreign_key.target_fullname.split(".", 1)
+ return db.engine.execute(f"SELECT {column_name} FROM {table_name} LIMIT
1").scalar()
+
+
+def generate_value(column: Column) -> Any:
+ if hasattr(column.type, "enums"):
+ return random.choice(column.type.enums)
+
+ json_as_string = "json" in column.name.lower() and isinstance(
+ column.type, sqlalchemy.sql.sqltypes.Text
+ )
+ type_ = sqlalchemy.sql.sqltypes.JSON() if json_as_string else column.type
+ value = get_type_generator(type_)()
+ if json_as_string:
+ value = json.dumps(value)
+ return value
+
+
+def find_models(module: ModuleType) -> List[Type[Model]]:
+ """
+ Find all models in a migration script.
+ """
+ models: List[Type[Model]] = []
+ tables = extract_modified_tables(module)
+
+ # add models defined explicitly in the migration script
+ queue = list(module.__dict__.values())
+ while queue:
+ obj = queue.pop()
+ if hasattr(obj, "__tablename__"):
+ tables.add(obj.__tablename__)
+ elif isinstance(obj, list):
+ queue.extend(obj)
+ elif isinstance(obj, dict):
+ queue.extend(obj.values())
+
+ # add implicit models
+ # pylint: disable=no-member, protected-access
+ for obj in Model._decl_class_registry.values():
+ if hasattr(obj, "__table__") and obj.__table__.fullname in tables:
+ models.append(obj)
+
+ # sort topologically so we can create entities in order and
+ # maintain relationships (eg, create a database before creating
+ # a slice)
+ sorter = TopologicalSorter()
+ for model in models:
+ inspector = inspect(model)
+ dependent_tables: List[str] = []
+ for column in inspector.columns.values():
+ for foreign_key in column.foreign_keys:
+
dependent_tables.append(foreign_key.target_fullname.split(".")[0])
+ sorter.add(model.__tablename__, *dependent_tables)
+ order = list(sorter.static_order())
+ models.sort(key=lambda model: order.index(model.__tablename__))
+
+ return models
+
+
+def import_migration_script(filepath: Path) -> ModuleType:
+ """
+ Import migration script as if it were a module.
+ """
+ spec = importlib.util.spec_from_file_location(filepath.stem, filepath)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module) # type: ignore
+ return module
+
+
+def extract_modified_tables(module: ModuleType) -> Set[str]:
+ """
+ Extract the tables being modified by a migration script.
+
+ This function uses a simple approach of looking at the source code of
+ the migration script looking for patterns. It could be improved by
+ actually traversing the AST.
+ """
+
+ tables: Set[str] = set()
+ for function in {"upgrade", "downgrade"}:
+ source = getsource(getattr(module, function))
+ tables.update(re.findall(r'alter_table\(\s*"(\w+?)"\s*\)', source,
re.DOTALL))
+ tables.update(re.findall(r'add_column\(\s*"(\w+?)"\s*,', source,
re.DOTALL))
+ tables.update(re.findall(r'drop_column\(\s*"(\w+?)"\s*,', source,
re.DOTALL))
Review comment:
What about indexes or inline data modifications?
##########
File path: scripts/benchmark_migration.py
##########
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from collections import defaultdict
+from pathlib import Path
+from typing import Dict, List, Type
+
+import click
+from flask_appbuilder import Model
+from flask_migrate import downgrade, upgrade
+from sqlalchemy.orm import Session
+
+from superset import db
+from superset.utils.data import add_to_model, find_models,
import_migration_script
+
+logger = logging.getLogger(__name__)
+
+
[email protected]()
[email protected]("filepath")
[email protected]("--limit", default=1000, help="Maximum number of entities.")
[email protected]("--force", is_flag=True, help="Do not prompt for confirmation.")
+def main(filepath: str, limit: int = 1000, force: bool = False) -> None:
+ session = db.session()
+
+ print(f"Importing migration script: {filepath}")
+ module = import_migration_script(Path(filepath))
+
+ revision: str = getattr(module, "revision", "")
+ down_revision: str = getattr(module, "down_revision", "")
+ if not revision or not down_revision:
+ raise Exception(
+ "Not a valid migration script, couldn't find
down_revision/revision"
+ )
+
+ print(f"Migration goes from {down_revision} to {revision}")
+ current_revision = db.engine.execute(
+ "SELECT version_num FROM alembic_version"
+ ).scalar()
+ print(f"Current version of the DB is {current_revision}")
+
+ print("\nIdentifying models used in the migration:")
+ models = find_models(module)
+ model_rows: Dict[Type[Model], int] = {}
+ for model in models:
+ rows = session.query(model).count()
+ print(f"- {model.__name__} ({rows} rows in table
{model.__tablename__})")
+ model_rows[model] = rows
+
+ if not force:
+ click.confirm(
+ f"\nDowngrade DB to {down_revision} and start benchmark?",
abort=True
+ )
+ downgrade(revision=down_revision)
+
+ print("Benchmarking migration")
+ results: Dict[str, float] = {}
+ start = time.time()
+ upgrade(revision=revision)
+ duration = time.time() - start
+ results["Current"] = duration
+ print(f"Migration on current DB took: {duration:.2f} seconds")
+
+ min_entities = 10
+ new_models: Dict[Type[Model], List[Model]] = defaultdict(list)
+ while min_entities <= limit:
+ downgrade(revision=down_revision)
+ print(f"Running with at least {min_entities} entities of each model")
Review comment:
Is this running the migration for a specific number of rows, or for a
_minimum_ number of rows?
##########
File path: superset/utils/data.py
##########
@@ -161,5 +227,145 @@ def generate_data(columns: List[ColumnInfo], num_rows:
int) -> List[Dict[str, An
def generate_column_data(column: ColumnInfo, num_rows: int) -> List[Any]:
- func = get_type_generator(column["type"])
- return [func() for _ in range(num_rows)]
+ gen = get_type_generator(column["type"])
+ return [gen() for _ in range(num_rows)]
+
+
+def add_to_model(session: Session, model: Type[Model], count: int) ->
List[Model]:
+ """
+ Add entities of a given model.
+
+ :param Model model: a Superset/FAB model
+ :param int count: how many entities to generate and insert
+ """
+ inspector = inspect(model)
+
+ # select samples to copy relationship values
+ relationships = inspector.relationships.items()
+ samples = session.query(model).limit(count).all() if relationships else []
+
+ entities: List[Model] = []
+ max_primary_key: Optional[int] = None
+ for i in range(count):
+ sample = samples[i % len(samples)] if samples else None
+ kwargs = {}
+ for column in inspector.columns.values():
+ # for primary keys, keep incrementing
+ if column.primary_key:
+ if max_primary_key is None:
+ max_primary_key = (
+ session.query(func.max(getattr(model,
column.name))).scalar()
+ or 0
+ )
+ max_primary_key += 1
+ kwargs[column.name] = max_primary_key
+
+ # if the column has a foreign key, copy the value from an existing
entity
+ elif column.foreign_keys:
+ if sample:
+ kwargs[column.name] = getattr(sample, column.name)
+ else:
+ kwargs[column.name] = get_valid_foreign_key(column)
+
+ # should be an enum but it's not
+ elif column.name == "datasource_type":
Review comment:
Seems like overrides should live in config, not inline.
##########
File path: superset/utils/data.py
##########
@@ -161,5 +227,145 @@ def generate_data(columns: List[ColumnInfo], num_rows:
int) -> List[Dict[str, An
def generate_column_data(column: ColumnInfo, num_rows: int) -> List[Any]:
- func = get_type_generator(column["type"])
- return [func() for _ in range(num_rows)]
+ gen = get_type_generator(column["type"])
+ return [gen() for _ in range(num_rows)]
+
+
+def add_to_model(session: Session, model: Type[Model], count: int) ->
List[Model]:
+ """
+ Add entities of a given model.
+
+ :param Model model: a Superset/FAB model
+ :param int count: how many entities to generate and insert
+ """
+ inspector = inspect(model)
+
+ # select samples to copy relationship values
+ relationships = inspector.relationships.items()
+ samples = session.query(model).limit(count).all() if relationships else []
+
+ entities: List[Model] = []
+ max_primary_key: Optional[int] = None
+ for i in range(count):
+ sample = samples[i % len(samples)] if samples else None
+ kwargs = {}
+ for column in inspector.columns.values():
+ # for primary keys, keep incrementing
+ if column.primary_key:
+ if max_primary_key is None:
+ max_primary_key = (
+ session.query(func.max(getattr(model,
column.name))).scalar()
+ or 0
+ )
+ max_primary_key += 1
+ kwargs[column.name] = max_primary_key
+
+ # if the column has a foreign key, copy the value from an existing
entity
+ elif column.foreign_keys:
+ if sample:
+ kwargs[column.name] = getattr(sample, column.name)
+ else:
+ kwargs[column.name] = get_valid_foreign_key(column)
Review comment:
Will this work with foreign keys with uniqueness constraints?
##########
File path: scripts/benchmark_migration.py
##########
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from collections import defaultdict
+from pathlib import Path
+from typing import Dict, List, Type
+
+import click
+from flask_appbuilder import Model
+from flask_migrate import downgrade, upgrade
+from sqlalchemy.orm import Session
+
+from superset import db
+from superset.utils.data import add_to_model, find_models,
import_migration_script
+
+logger = logging.getLogger(__name__)
+
+
[email protected]()
[email protected]("filepath")
[email protected]("--limit", default=1000, help="Maximum number of entities.")
[email protected]("--force", is_flag=True, help="Do not prompt for confirmation.")
+def main(filepath: str, limit: int = 1000, force: bool = False) -> None:
+ session = db.session()
+
+ print(f"Importing migration script: {filepath}")
+ module = import_migration_script(Path(filepath))
+
+ revision: str = getattr(module, "revision", "")
+ down_revision: str = getattr(module, "down_revision", "")
+ if not revision or not down_revision:
+ raise Exception(
+ "Not a valid migration script, couldn't find
down_revision/revision"
+ )
+
+ print(f"Migration goes from {down_revision} to {revision}")
+ current_revision = db.engine.execute(
+ "SELECT version_num FROM alembic_version"
Review comment:
Will this always select the most recent revision on all database types?
There's also a `alembic.command.current` command.
##########
File path: superset/utils/data.py
##########
@@ -14,17 +14,36 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import decimal
+import importlib.util
+import json
+import logging
+import os
import random
+import re
import string
import sys
from datetime import date, datetime, time, timedelta
-from typing import Any, Callable, cast, Dict, List, Optional
+from inspect import getsource
+from pathlib import Path
+from types import ModuleType
+from typing import Any, Callable, cast, Dict, List, Optional, Set, Type
+from uuid import uuid4
import sqlalchemy.sql.sqltypes
+import sqlalchemy_utils
+from flask_appbuilder import Model
+from graphlib import TopologicalSorter # pylint: disable=wrong-import-order
Review comment:
Is this available only in Python 3.9?
##########
File path: requirements/base.txt
##########
@@ -6,300 +6,299 @@
# pip-compile-multi
#
-e file:.
-# via -r requirements/base.in
-aiohttp==3.7.2
-# via slackclient
-alembic==1.4.3
-# via flask-migrate
+ # via -r requirements/base.in
+aiohttp==3.7.4.post0
+ # via slackclient
+alembic==1.5.7
+ # via flask-migrate
amqp==2.6.1
-# via kombu
+ # via kombu
apispec[yaml]==3.3.2
-# via flask-appbuilder
+ # via flask-appbuilder
async-timeout==3.0.1
-# via aiohttp
-attrs==20.2.0
-# via
-# aiohttp
-# jsonschema
-babel==2.8.0
-# via flask-babel
+ # via aiohttp
+attrs==20.3.0
+ # via
+ # aiohttp
+ # jsonschema
+babel==2.9.0
Review comment:
There's several unrelated package upgrades here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]