maropu commented on a change in pull request #30648:
URL: https://github.com/apache/spark/pull/30648#discussion_r537916908
##########
File path:
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
##########
@@ -134,6 +134,8 @@ statement
(AS? query)?
#replaceTable
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
+ | ANALYZE TABLES ((FROM | IN) multipartIdentifier)? COMPUTE STATISTICS
+ (identifier)?
#analyzeTables
Review comment:
If` identifier` is only for `NOSCAN`, how about defining a ANTLR token
for that?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
##########
@@ -3547,6 +3547,25 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with
SQLConfHelper with Logg
}
}
+ /**
+ * Create an [[AnalyzeTables]].
+ * Example SQL for analyzing all tables in default database:
+ * {{{
+ * ANALYZE TABLES IN default COMPUTE STATISTICS;
Review comment:
`ANALYZE TABLES IN default COMPUTE STATISTICS;` -> `ANALYZE TABLES IN
multi_part_name COMPUTE STATISTICS [NOSCAN];`?
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
##########
@@ -1898,6 +1898,15 @@ class DDLParserSuite extends AnalysisTest {
"Expected `NOSCAN` instead of `xxxx`")
}
+ test("analyze tables statistics") {
+ comparePlans(parsePlan("analyze tables in a.b.c compute statistics"),
Review comment:
Please use uppercases for the SQL keywords where possible.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
##########
@@ -671,4 +671,24 @@ class StatisticsCollectionSuite extends
StatisticsCollectionTestBase with Shared
}
}
}
+
+ test("analyze all tables in a specific database") {
+ withTempDir { dir =>
+ withTable("t1", "t2") {
+ spark.range(10).write.saveAsTable("t1")
+ sql(s"CREATE EXTERNAL TABLE t2 USING parquet LOCATION '${dir.toURI}' "
+
+ "AS SELECT * FROM range(20)")
+ withView("v1") {
+ sql(s"CREATE VIEW v1 AS SELECT * FROM t1")
+ sql(s"ANALYZE TABLES IN default COMPUTE STATISTICS NOSCAN")
+ checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts =
None)
+ checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts =
None)
+
+ sql(s"ANALYZE TABLES COMPUTE STATISTICS")
Review comment:
nit: remove `s`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+
+
+/**
+ * Analyzes all tables in the given database to generate statistics.
+ */
+case class AnalyzeTablesCommand(
+ databaseName: Option[String],
+ noScan: Boolean) extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+
+ val catalog = sparkSession.sessionState.catalog
+ val db = databaseName.getOrElse(catalog.getCurrentDatabase)
+ catalog.listTables(db).foreach { tbl =>
+ try {
+ val tableMeta = catalog.getTableMetadata(tbl)
+ if (tableMeta.tableType == CatalogTableType.MANAGED ||
Review comment:
How about cached views? `ANALYZE TABLE` supports cached case for views:
```
scala> sql("create temporary view v as select 1")
scala> sql("analyze table v COMPUTE STATISTICS FOR ALL COLUMNS")
org.apache.spark.sql.AnalysisException: Temporary view `v` is not cached for
analyzing columns.;
at
org.apache.spark.sql.execution.command.AnalyzeColumnCommand.analyzeColumnInTempView(AnalyzeColumnCommand.scala:75)
...
scala> spark.table("v").cache()
scala> sql("analyze table v COMPUTE STATISTICS FOR ALL COLUMNS")
res3: org.apache.spark.sql.DataFrame = []
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
##########
@@ -3547,6 +3547,25 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with
SQLConfHelper with Logg
}
}
+ /**
+ * Create an [[AnalyzeTables]].
+ * Example SQL for analyzing all tables in default database:
+ * {{{
+ * ANALYZE TABLES IN default COMPUTE STATISTICS;
+ * }}}
+ */
+ override def visitAnalyzeTables(ctx: AnalyzeTablesContext): LogicalPlan =
withOrigin(ctx) {
+ if (ctx.identifier != null &&
+ ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+ throw new ParseException(s"Expected `NOSCAN` instead of
`${ctx.identifier.getText}`",
+ ctx.identifier())
+ }
+ val multiPart =
Option(ctx.multipartIdentifier).map(visitMultipartIdentifier)
+ AnalyzeTables(
+ UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])),
Review comment:
nit: `Seq.empty[String]` -> `Nil`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
##########
@@ -401,6 +401,13 @@ class ResolveSessionCatalog(
AnalyzePartitionCommand(ident.asTableIdentifier, partitionSpec, noScan)
}
+ case AnalyzeTables(SessionCatalogAndNamespace(_, ns), noScan) =>
+ if (ns.length > 1) {
+ throw new AnalysisException(
+ s"The database name is not valid: ${ns.quoted}")
Review comment:
nit: we can put this in a single line: `throw new
AnalysisException(s"The database name is not valid: ${ns.quoted}")`
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
##########
@@ -671,4 +671,24 @@ class StatisticsCollectionSuite extends
StatisticsCollectionTestBase with Shared
}
}
}
+
+ test("analyze all tables in a specific database") {
+ withTempDir { dir =>
+ withTable("t1", "t2") {
+ spark.range(10).write.saveAsTable("t1")
+ sql(s"CREATE EXTERNAL TABLE t2 USING parquet LOCATION '${dir.toURI}' "
+
+ "AS SELECT * FROM range(20)")
+ withView("v1") {
+ sql(s"CREATE VIEW v1 AS SELECT * FROM t1")
+ sql(s"ANALYZE TABLES IN default COMPUTE STATISTICS NOSCAN")
Review comment:
Also, could you add tests for the case where no database found?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+
+
+/**
+ * Analyzes all tables in the given database to generate statistics.
+ */
+case class AnalyzeTablesCommand(
+ databaseName: Option[String],
+ noScan: Boolean) extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+
+ val catalog = sparkSession.sessionState.catalog
+ val db = databaseName.getOrElse(catalog.getCurrentDatabase)
+ catalog.listTables(db).foreach { tbl =>
+ try {
+ val tableMeta = catalog.getTableMetadata(tbl)
+ if (tableMeta.tableType == CatalogTableType.MANAGED ||
+ tableMeta.tableType == CatalogTableType.EXTERNAL) {
+ // Compute stats for the whole table
+ val newTotalSize = CommandUtils.calculateTotalSize(sparkSession,
tableMeta)
+ val tableIdentWithDB = TableIdentifier(tbl.table, Some(db))
+ val newRowCount =
+ if (noScan) None else
Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+
+ // Update the metastore if the above statistics of the table are
different from those
+ // recorded in the metastore.
+ val newStats =
+ CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize,
newRowCount)
+ if (newStats.isDefined) {
+ catalog.alterTableStats(tableIdentWithDB, newStats)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Failed to analyze table: ${tbl.identifier}.", e)
Review comment:
`AnalysisException` instead of `logError`, I think. And, we need tests
for this code path.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]