maropu commented on a change in pull request #24047: [SPARK-25196][SQL] Extends
the analyze column command for cached tables
URL: https://github.com/apache/spark/pull/24047#discussion_r267608769
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
##########
@@ -39,32 +40,39 @@ case class AnalyzeColumnCommand(
require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or
`allColumns` are " +
"mutually exclusive. Only one of them should be specified.")
val sessionState = sparkSession.sessionState
- val db =
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
- val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
- if (tableMeta.tableType == CatalogTableType.VIEW) {
- throw new AnalysisException("ANALYZE TABLE is not supported on views.")
- }
- val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
- val relation = sparkSession.table(tableIdent).logicalPlan
- val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation,
columnNames, allColumns)
-
- // Compute stats for the computed list of columns.
- val (rowCount, newColStats) =
- CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze)
- // We also update table-level stats in order to keep them consistent with
column-level stats.
- val statistics = CatalogStatistics(
- sizeInBytes = sizeInBytes,
- rowCount = Some(rowCount),
- // Newly computed column stats should override the existing ones.
- colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++
newColStats)
-
- sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))
+ tableIdent.database match {
+ case Some(db) if db ==
sparkSession.sharedState.globalTempViewManager.database =>
+ val plan =
sessionState.catalog.getGlobalTempView(tableIdent.identifier).getOrElse {
+ throw new NoSuchTableException(db = db, table =
tableIdent.identifier)
+ }
+ analyzeColumnInTempView(plan, sparkSession)
+ case Some(_) =>
+ analyzeColumnInCatalog(sparkSession)
+ case None =>
+ sessionState.catalog.getTempView(tableIdent.identifier) match {
+ case Some(tempView) => analyzeColumnInTempView(tempView,
sparkSession)
+ case _ => analyzeColumnInCatalog(sparkSession)
+ }
+ }
Seq.empty[Row]
}
+ private def analyzeColumnInTempView(plan: LogicalPlan, sparkSession:
SparkSession): Unit = {
+ val cacheManager = sparkSession.sharedState.cacheManager
+ cacheManager.lookupCachedData(plan) match {
+ case Some(cachedData) =>
+ val columnsToAnalyze = getColumnsToAnalyze(
+ tableIdent, cachedData.plan, columnNames, allColumns)
+ cacheManager.analyzeColumnCacheQuery(sparkSession, cachedData,
columnsToAnalyze)
+ case _ =>
+ val catalog = sparkSession.sessionState.catalog
+ val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+ throw new NoSuchTableException(db = db, table = tableIdent.identifier)
Review comment:
Since the existing tests expect this exception, I kept as it is:
https://github.com/apache/spark/blob/d6ee2f331db461c1f7a25e0ef17901f53d8b707e/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala#L163
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]