mistercrunch commented on code in PR #30760:
URL: https://github.com/apache/superset/pull/30760#discussion_r2023808017
##########
superset/db_engine_specs/bigquery.py:
##########
@@ -289,66 +294,115 @@ def _truncate_label(cls, label: str) -> str:
return "_" + md5_sha_from_str(label)
@classmethod
- @deprecated(deprecated_in="3.0")
- def normalize_indexes(cls, indexes: list[dict[str, Any]]) ->
list[dict[str, Any]]:
- """
- Normalizes indexes for more consistency across db engines
+ def where_latest_partition(
+ cls,
+ database: Database,
+ table: Table,
+ query: Select,
+ columns: list[ResultSetColumnType] | None = None,
+ ) -> Select | None:
+ if partition_column := cls.get_time_partition_column(database, table):
+ max_partition_id = cls.get_max_partition_id(database, table)
+ query = query.where(
+ column(partition_column) == func.PARSE_DATE("%Y%m%d",
max_partition_id)
+ )
- :param indexes: Raw indexes as returned by SQLAlchemy
- :return: cleaner, more aligned index definition
- """
- normalized_idxs = []
- # Fixing a bug/behavior observed in pybigquery==0.4.15 where
- # the index's `column_names` == [None]
- # Here we're returning only non-None indexes
- for ix in indexes:
- column_names = ix.get("column_names") or []
- ix["column_names"] = [col for col in column_names if col is not
None]
- if ix["column_names"]:
- normalized_idxs.append(ix)
- return normalized_idxs
+ return query
@classmethod
- def get_indexes(
+ def get_max_partition_id(
cls,
database: Database,
- inspector: Inspector,
table: Table,
- ) -> list[dict[str, Any]]:
- """
- Get the indexes associated with the specified schema/table.
+ ) -> Select | None:
+ # Compose schema from catalog and schema
+ schema_parts = []
+ if table.catalog:
+ schema_parts.append(table.catalog)
+ if table.schema:
+ schema_parts.append(table.schema)
+ schema_parts.append("INFORMATION_SCHEMA")
+ schema = ".".join(schema_parts)
+ # Define a virtual table reference to INFORMATION_SCHEMA.PARTITIONS
+ partitions_table = sql_table(
+ "PARTITIONS",
+ sql_column("partition_id"),
+ sql_column("table_name"),
+ schema=schema,
+ )
- :param database: The database to inspect
- :param inspector: The SQLAlchemy inspector
- :param table: The table instance to inspect
- :returns: The indexes
- """
+ # Build the query
+ query = select(
+ func.max(partitions_table.c.partition_id).label("max_partition_id")
+ ).where(partitions_table.c.table_name == table.table)
+
+ # Compile to BigQuery SQL
+ compiled_query = query.compile(
+ dialect=database.get_dialect(),
+ compile_kwargs={"literal_binds": True},
+ )
+
+ # Run the query and handle result
+ df = database.get_df(str(compiled_query))
+ if df.empty or df.iat[0, 0] is None:
+ return None
+
+ return df.iat[0, 0]
Review Comment:
> We should standardize how DB engine specs run queries, BTW!
Totally, I just used `get_df` cause it's an easy way to run a query...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]