[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81768164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -581,6 +581,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val NDV_MAX_ERROR = +SQLConfigBuilder("spark.sql.statistics.ndv.maxError") + .internal() + .doc("The maximum estimation error allowed in HyperLogLog++ algorithm.") --- End diff -- @viirya Thanks, I'll update the config description in the followup pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81668637 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -581,6 +581,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val NDV_MAX_ERROR = +SQLConfigBuilder("spark.sql.statistics.ndv.maxError") + .internal() + .doc("The maximum estimation error allowed in HyperLogLog++ algorithm.") --- End diff -- > The maximum estimation error allowed in HyperLogLog++ algorithm. I mean, it sounds like this config is a general one to set the error for HyperLogLog++ algorithm in ALL scenarios. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15090 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81594675 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -581,6 +581,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val NDV_MAX_ERROR = +SQLConfigBuilder("spark.sql.statistics.ndv.maxError") + .internal() + .doc("The maximum estimation error allowed in HyperLogLog++ algorithm.") --- End diff -- I'm slightly confused: what's the difference between what you said the what's in the doc? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81507325 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -581,6 +581,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val NDV_MAX_ERROR = +SQLConfigBuilder("spark.sql.statistics.ndv.maxError") + .internal() + .doc("The maximum estimation error allowed in HyperLogLog++ algorithm.") --- End diff -- This config comment is not correct. It looks like it will set up the maximum estimation error for HyperLogLog++ algorithm. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81460994 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val duplicatedColumns = mutable.MutableList[String]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } else { +duplicatedColumns += col + } +} +if (duplicatedColumns.nonEmpty) { + logWarning(s"Duplicated columns ${duplicatedColumns.mkString("(", ", ", ")")} detected " + +s"when analyzing columns ${columnNames.mkString("(", ", ", ")")}, ignoring them.") --- End diff -- How about this? ```Scala logWarning("A duplicate column name was detected in `ANALYZE TABLE` statement. " +
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81456177 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val duplicatedColumns = mutable.MutableList[String]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } else { +duplicatedColumns += col + } +} +if (duplicatedColumns.nonEmpty) { + logWarning(s"Duplicated columns ${duplicatedColumns.mkString("(", ", ", ")")} detected " + +s"when analyzing columns ${columnNames.mkString("(", ", ", ")")}, ignoring them.") +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following eleme
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81455841 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val duplicatedColumns = mutable.MutableList[String]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } else { +duplicatedColumns += col + } +} +if (duplicatedColumns.nonEmpty) { + logWarning(s"Duplicated columns ${duplicatedColumns.mkString("(", ", ", ")")} detected " + +s"when analyzing columns ${columnNames.mkString("(", ", ", ")")}, ignoring them.") +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following eleme
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81454093 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +val tableName = "tbl" + +// we need to specify column names +intercept[ParseException] { + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS") +} + +val analyzeSql = s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS key, value" +val parsed = spark.sessionState.sqlParser.parsePlan(analyzeSql) +val expected = AnalyzeColumnCommand(TableIdentifier(tableName), Seq("key", "value")) +comparePlans(parsed, expected) + } + + test("analyzing columns of non-atomic types is not supported") { +val tableName = "tbl" +withTable(tableName) { + Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 3.toDF().write.saveAsTable(tableName) + val err = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS data") + } + assert(err.message.contains("Analyzing columns is not supported")) +} + } + + test("check correctness of columns") { +val table = "tbl" +val colName1 = "abc" +val colName2 = "x.yz" +withTable(table) { + sql(s"CREATE TABLE $table ($colName1 int, `$colName2` string) USING PARQUET") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == "Invalid column name: key.") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ${colName1.toUpperCase}") +} +assert(invalidErr.message == s"Invalid column name: ${colName1.toUpperCase}.") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, colName2) +val tableIdent = TableIdentifier(table, Some("default")) +val relation = spark.sessionState.catalog.lookupRelation(tableIdent) +val columnStats = + AnalyzeColumnCommand(tableIdent, columnsToAnalyze).computeColStats(spark, relation)._2 --- End diff -- The same here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81454078 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} +import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, ColumnStatStruct} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +trait StatisticsTest extends QueryTest with SharedSQLContext { + + def checkColStats( + df: DataFrame, + expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = { +val table = "tbl" +withTable(table) { + df.write.format("json").saveAsTable(table) + val columns = expectedColStatsSeq.map(_._1) + val tableIdent = TableIdentifier(table, Some("default")) + val relation = spark.sessionState.catalog.lookupRelation(tableIdent) + val columnStats = +AnalyzeColumnCommand(tableIdent, columns.map(_.name)).computeColStats(spark, relation)._2 --- End diff -- Nit: ```Scala val (_, columnStats) = AnalyzeColumnCommand(tableIdent, columnsToAnalyze).computeColStats(spark, relation) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81453676 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} +import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, ColumnStatStruct} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +trait StatisticsTest extends QueryTest with SharedSQLContext { + + def checkColStats( + df: DataFrame, + expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = { +val table = "tbl" +withTable(table) { + df.write.format("json").saveAsTable(table) + val columns = expectedColStatsSeq.map(_._1) + val tableIdent = TableIdentifier(table, Some("default")) + val relation = spark.sessionState.catalog.lookupRelation(tableIdent) + val columnStats = +AnalyzeColumnCommand(tableIdent, columns.map(_.name)).computeColStats(spark, relation)._2 + expectedColStatsSeq.foreach { expected => --- End diff -- Yeah, that's better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81453655 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +38,80 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStat] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + */ +case class ColumnStat(statRow: InternalRow) { + + def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { --- End diff -- But we need to include DateType and TimestampType. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81453552 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } else { +logWarning(s"Duplicated column: $col") --- End diff -- You need to explain the context. Otherwise, the log message becomes useless. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- ---
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81450136 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -473,15 +476,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } // construct Spark's statistics from information in Hive metastore -if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) { - val totalSize = BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get) - // TODO: we will compute "estimatedSize" when we have column stats: - // average size of row * number of rows +if (catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)).nonEmpty) { --- End diff -- should we make `catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))` a variable? it's used twice --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81450056 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +val tableName = "tbl" + +// we need to specify column names +intercept[ParseException] { + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS") +} + +val analyzeSql = s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS key, value" +val parsed = spark.sessionState.sqlParser.parsePlan(analyzeSql) +val expected = AnalyzeColumnCommand(TableIdentifier(tableName), Seq("key", "value")) +comparePlans(parsed, expected) + } + + test("analyzing columns of non-atomic types is not supported") { +val tableName = "tbl" +withTable(tableName) { + Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 3.toDF().write.saveAsTable(tableName) + val err = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS data") + } + assert(err.message.contains("Analyzing columns is not supported")) +} + } + + test("check correctness of columns") { +val table = "tbl" +val colName1 = "abc" +val colName2 = "x.yz" +withTable(table) { + sql(s"CREATE TABLE $table ($colName1 int, `$colName2` string) USING PARQUET") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == "Invalid column name: key.") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ${colName1.toUpperCase}") +} +assert(invalidErr.message == s"Invalid column name: ${colName1.toUpperCase}.") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, colName2) +val tableIdent = TableIdentifier(table, Some("default")) +val relation = spark.sessionState.catalog.lookupRelation(tableIdent) +val columnStats = + AnalyzeColumnCommand(tableIdent, columnsToAnalyze).computeColStats(spark, relation)._2 +assert(columnStats.contains(colName1)) +assert(columnStats.contains(colName2)) +// check deduplication +assert(columnStats.size == 2) +assert(!columnStats.contains(colName2.toUpperCase)) + } +} + } + + private def getNonNullValues[T](values: Seq[Option[T]]): Seq[T] = { +values.filter(_.isDefined).map(_.get) + } + + test("column-level statistics for integral type columns") { +val values = (0 to 5).map { i => + if (i % 2 == 0) None else Some(i) +} +val data = values.map { i => + (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong)) +} + +val df = data.toDF("c1", "c2", "c3", "c4") +val nonNullValues = getNonNullValues[Int](values) +val expectedColStatsSeq = df.schema.map { f => + val
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81450027 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +val tableName = "tbl" + +// we need to specify column names +intercept[ParseException] { + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS") +} + +val analyzeSql = s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS key, value" +val parsed = spark.sessionState.sqlParser.parsePlan(analyzeSql) +val expected = AnalyzeColumnCommand(TableIdentifier(tableName), Seq("key", "value")) +comparePlans(parsed, expected) + } + + test("analyzing columns of non-atomic types is not supported") { +val tableName = "tbl" +withTable(tableName) { + Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 3.toDF().write.saveAsTable(tableName) + val err = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS data") + } + assert(err.message.contains("Analyzing columns is not supported")) +} + } + + test("check correctness of columns") { +val table = "tbl" +val colName1 = "abc" +val colName2 = "x.yz" +withTable(table) { + sql(s"CREATE TABLE $table ($colName1 int, `$colName2` string) USING PARQUET") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == "Invalid column name: key.") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ${colName1.toUpperCase}") +} +assert(invalidErr.message == s"Invalid column name: ${colName1.toUpperCase}.") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, colName2) +val tableIdent = TableIdentifier(table, Some("default")) +val relation = spark.sessionState.catalog.lookupRelation(tableIdent) +val columnStats = + AnalyzeColumnCommand(tableIdent, columnsToAnalyze).computeColStats(spark, relation)._2 +assert(columnStats.contains(colName1)) +assert(columnStats.contains(colName2)) +// check deduplication +assert(columnStats.size == 2) +assert(!columnStats.contains(colName2.toUpperCase)) + } +} + } + + private def getNonNullValues[T](values: Seq[Option[T]]): Seq[T] = { +values.filter(_.isDefined).map(_.get) + } + + test("column-level statistics for integral type columns") { +val values = (0 to 5).map { i => + if (i % 2 == 0) None else Some(i) +} +val data = values.map { i => + (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong)) +} + +val df = data.toDF("c1", "c2", "c3", "c4") +val nonNullValues = getNonNullValues[Int](values) +val expectedColStatsSeq = df.schema.map { f => + val
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81449787 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +38,80 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStat] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + */ +case class ColumnStat(statRow: InternalRow) { + + def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { --- End diff -- We have `NumericType`, can we use that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81449665 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} +import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, ColumnStatStruct} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +trait StatisticsTest extends QueryTest with SharedSQLContext { + + def checkColStats( + df: DataFrame, + expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = { +val table = "tbl" +withTable(table) { + df.write.format("json").saveAsTable(table) + val columns = expectedColStatsSeq.map(_._1) + val tableIdent = TableIdentifier(table, Some("default")) + val relation = spark.sessionState.catalog.lookupRelation(tableIdent) + val columnStats = +AnalyzeColumnCommand(tableIdent, columns.map(_.name)).computeColStats(spark, relation)._2 + expectedColStatsSeq.foreach { expected => --- End diff -- how about `...foreach { case (field, expectedStat) =>` ? Then we use `field.name` instead of `expected._1.name`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81379672 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication --- End diff -- @gatorsmile OK, let me add a log for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81275778 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication --- End diff -- How about issuing `logWarning`? I still think a silent drop is a concern --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81259988 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) { + val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) + val operators = parsed.collect { +case a: AnalyzeColumnCommand => a +case o => o + } + assert(operators.size == 1) + if (operators.head.getClass != c) { --- End diff -- ok, then I'll fix it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81259329 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) { + val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) + val operators = parsed.collect { +case a: AnalyzeColumnCommand => a +case o => o + } + assert(operators.size == 1) + if (operators.head.getClass != c) { --- End diff -- I don't think it's a good style, parsing a SQL string and constructing an expected logical plan and then compare them looks much clearer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81259200 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) { + val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) + val operators = parsed.collect { --- End diff -- What this `collect` do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81259064 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +38,84 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStat] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + */ +case class ColumnStat(statRow: InternalRow) { + + def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { +NumericColumnStat(statRow, dataType) + } + def forString: StringColumnStat = StringColumnStat(statRow) + def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) + def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + + override def toString: String = { +// use Base64 for encoding +Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) } } + +object ColumnStat { + def apply(dataType: DataType, str: String): ColumnStat = { +// use Base64 for decoding +val bytes = Base64.decodeBase64(str) +val numFields = dataType match { + case BinaryType | BooleanType => 3 + case _ => 4 +} +val unsafeRow = new UnsafeRow(numFields) --- End diff -- ah i see, sorry I read the code wrong --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81243645 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Literal(1)).toAggregateExpression() +: + attributesToAnalyze.map(ColumnStatStruct(_, ndvMaxErr)) +val namedEx
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81192204 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { --- End diff -- I follow the style in StatisticsSuite. I think this style is ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81186610 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala --- @@ -186,13 +187,18 @@ private[sql] class SessionState(sparkSession: SparkSession) { } /** - * Analyzes the given table in the current database to generate statistics, which will be + * Analyzes the given table in the current database to generate table-level statistics, which + * will be used in query optimizations. + */ + def analyzeTable(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = { +AnalyzeTableCommand(tableIdent, noscan).run(sparkSession) + } + + /** + * Analyzes the given columns in the table to generate column-level statistics, which will be * used in query optimizations. - * - * Right now, it only supports catalog tables and it only updates the size of a catalog table - * in the external catalog. */ - def analyze(tableName: String, noscan: Boolean = true): Unit = { --- End diff -- Now AnalyzeTableCommand receives TableIdentifier instead of String tableName as parameter, as @hvanhovell suggested in [comment](https://github.com/apache/spark/pull/15090#r78717769) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81183331 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication --- End diff -- Yes, hive does so. @hvanhovell and @gatorsmile also agreed on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81183194 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +38,84 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStat] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + */ +case class ColumnStat(statRow: InternalRow) { + + def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { +NumericColumnStat(statRow, dataType) + } + def forString: StringColumnStat = StringColumnStat(statRow) + def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) + def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + + override def toString: String = { +// use Base64 for encoding +Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) } } + +object ColumnStat { + def apply(dataType: DataType, str: String): ColumnStat = { +// use Base64 for decoding +val bytes = Base64.decodeBase64(str) +val numFields = dataType match { + case BinaryType | BooleanType => 3 + case _ => 4 +} +val unsafeRow = new UnsafeRow(numFields) --- End diff -- it only returns an empty UnsafeRow: ```java row.pointTo(new byte[numBytes], numBytes); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81103501 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) { + val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) + val operators = parsed.collect { +case a: AnalyzeColumnCommand => a +case o => o + } + assert(operators.size == 1) + if (operators.head.getClass != c) { +fail( + s"""$analyzeCommand expected command: $c, but got ${operators.head} + |parsed command: + |$parsed + """.stripMargin) + } +} + +val table = "table" +assertAnalyzeColumnCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +intercept[ParseException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} + } + + test("analyzing columns of non-atomic types is not supported") { +val tableName = "tbl" +withTable(tableName) { + Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 3.toDF().write.saveAsTable(tableName) + val err = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS data") + } + assert(err.message.contains("Analyzing columns is not supported")) +} + } + + test("check correctness of columns") { +val table = "tbl" +val colName1 = "abc" +val colName2 = "x.yz" +val quotedColName2 = s"`$colName2`" --- End diff -- you can inline this variable, i.e.`CREATE TABLE $table ($colName1 int, `$colName2` string)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81103116 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { --- End diff -- please follow the style in `DDLCommandSuite` to write test for parser rules --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81102672 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -581,6 +581,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val NDV_MAX_ERROR = +SQLConfigBuilder("spark.sql.statistics.ndv.maxError") + .internal() + .doc("The maximum estimation error allowed in HyperLogLog++ algorithm.") --- End diff -- should we mention that this conf is only used to generate column level statistics? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81102487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Literal(1)).toAggregateExpression() +: + attributesToAnalyze.map(ColumnStatStruct(_, ndvMaxErr)) +val nam
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81102018 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Literal(1)).toAggregateExpression() +: + attributesToAnalyze.map(ColumnStatStruct(_, ndvMaxErr)) +val nam
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81100678 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table to generate statistics, which will be used in + * query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val resolver = sparkSession.sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.output.find(attr => resolver(attr.name, col)) + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication --- End diff -- is it a common behaviour? i.e. if users specify duplicated columns, we just deduplicate silently instead of reporting error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r81100194 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +38,84 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStat] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + */ +case class ColumnStat(statRow: InternalRow) { + + def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { +NumericColumnStat(statRow, dataType) + } + def forString: StringColumnStat = StringColumnStat(statRow) + def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) + def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + + override def toString: String = { +// use Base64 for encoding +Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) } } + +object ColumnStat { + def apply(dataType: DataType, str: String): ColumnStat = { +// use Base64 for decoding +val bytes = Base64.decodeBase64(str) +val numFields = dataType match { + case BinaryType | BooleanType => 3 + case _ => 4 +} +val unsafeRow = new UnsafeRow(numFields) --- End diff -- we can just write `UnsafeRow.createFromBytesArray(bytes, numFields)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80763362 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Literal(
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80761163 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Lit
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80755706 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -693,6 +701,7 @@ object HiveExternalCatalog { val STATISTICS_PREFIX = "spark.sql.statistics." val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize" val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows" + val STATISTICS_BASIC_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats." --- End diff -- Oh, sorry I forgot to change the name here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80753145 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Literal(
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80639732 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) --- End diff -- ah i see --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80639243 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -473,15 +476,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } // construct Spark's statistics from information in Hive metastore -if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) { - val totalSize = BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get) - // TODO: we will compute "estimatedSize" when we have column stats: - // average size of row * number of rows +if (catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)).nonEmpty) { + val colStatsProps = catalogTable.properties +.filterKeys(_.startsWith(STATISTICS_BASIC_COL_STATS_PREFIX)) +.map { case (k, v) => (k.replace(STATISTICS_BASIC_COL_STATS_PREFIX, ""), v) } --- End diff -- I think it's safer to do `k.drop(STATISTICS_BASIC_COL_STATS_PREFIX.length)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80638681 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala --- @@ -186,13 +187,18 @@ private[sql] class SessionState(sparkSession: SparkSession) { } /** - * Analyzes the given table in the current database to generate statistics, which will be + * Analyzes the given table in the current database to generate table-level statistics, which + * will be used in query optimizations. + */ + def analyzeTable(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = { +AnalyzeTableCommand(tableIdent, noscan).run(sparkSession) + } + + /** + * Analyzes the given columns in the table to generate column-level statistics, which will be * used in query optimizations. - * - * Right now, it only supports catalog tables and it only updates the size of a catalog table - * in the external catalog. */ - def analyze(tableName: String, noscan: Boolean = true): Unit = { --- End diff -- actually this method is never used We should keep it unchanged in this PR, and remove it in follow-up --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80638078 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Lite
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80637650 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Lite
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80633172 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis --- End diff -- nice! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80632476 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) --- End diff -- We need to refresh the `stats` property of `catalogTable` in cached data source tables --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80630427 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis --- End diff -- ``` val resolver = sparkSession.sessionState.conf.resolver ... relation.output.find(attr => resolver(attr.name, col)) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80629937 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) --- End diff -- why do we need to refresh table for statistics updating? AFAIK, refresh table is used when the table data files changed, and we need to list files again and invalidate table cache. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80629684 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -87,19 +87,27 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN - * option (other options are passed on to Hive) e.g.: + * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. + * Example SQL for analyzing table : * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; + * ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN]; + * }}} + * Example SQL for analyzing columns : + * {{{ + * ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2; * }}} */ override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { if (ctx.partitionSpec == null && ctx.identifier != null && ctx.identifier.getText.toLowerCase == "noscan") { - AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString) + AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier)) +} else if (ctx.identifierSeq() == null) { + AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), noscan = false) --- End diff -- analyze table without `noncan` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80629335 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, --- End diff -- `... in the current database ...` users can specify the database in table name right? I think we can just say `given table` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80629224 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -87,19 +87,27 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN - * option (other options are passed on to Hive) e.g.: + * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. + * Example SQL for analyzing table : * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; + * ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN]; + * }}} + * Example SQL for analyzing columns : + * {{{ + * ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2; * }}} */ override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { if (ctx.partitionSpec == null && ctx.identifier != null && ctx.identifier.getText.toLowerCase == "noscan") { - AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString) + AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier)) +} else if (ctx.identifierSeq() == null) { + AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), noscan = false) --- End diff -- when will we hit this branch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80628790 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java --- @@ -31,6 +31,7 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import org.apache.spark.sql.catalyst.plans.logical.Except; --- End diff -- unnecessary change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80626981 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Literal(
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80615261 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStat]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(Lit
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80165466 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", --- End diff -- BTW, I've created a jira for this. We can fix it when column-level statistics including histograms are supported. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80161814 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", --- End diff -- I think it's ok for now, because we don't have other ways to see the column stats. We can delete this when we support `DESC FORMATTED TABLE COLUMN` command, which shows column-level stats (including histograms?) of the given column. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80156525 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", --- End diff -- I am not sure whether we should do this. Hive does not show it in the `DESC FORMATTED/EXTENDED`, right? The string could be super long when the table has many columns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r80097979 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStats]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { --- End diff -- @gatorsmile There's no need to handle case sensitivity here, because in `attributesToAnalyze` we have the name in schema. For example, if the column name in schema is "abc", for columns "abc" and "ABC", both their name in expr are "abc". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or i
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79975372 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStats]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { --- End diff -- Deduplication lacks case sensitivity handling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79975005 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStats, Statistics} +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +trait StatisticsTest extends QueryTest with SharedSQLContext { + + def checkColStats( + df: DataFrame, + expectedColStatsSeq: Seq[(String, ColumnStats)]): Unit = { +val table = "tbl" +withTable(table) { + df.write.format("json").saveAsTable(table) + val columns = expectedColStatsSeq.map(_._1) + val tableIdent = TableIdentifier(table, Some("default")) + val relation = spark.sessionState.catalog.lookupRelation(tableIdent) + val columnStats = +AnalyzeColumnCommand(tableIdent, columns).computeColStats(spark, relation)._2 + expectedColStatsSeq.foreach { expected => +assert(columnStats.contains(expected._1)) +checkColStats(colStats = columnStats(expected._1), expectedColStats = expected._2) + } +} + } + + def checkColStats(colStats: ColumnStats, expectedColStats: ColumnStats): Unit = { +assert(colStats.dataType == expectedColStats.dataType) +assert(colStats.numNulls == expectedColStats.numNulls) +colStats.dataType match { + case _: IntegralType | DateType | TimestampType => +assert(colStats.max.map(_.toString.toLong) == expectedColStats.max.map(_.toString.toLong)) +assert(colStats.min.map(_.toString.toLong) == expectedColStats.min.map(_.toString.toLong)) + case _: FractionalType => +assert(colStats.max.map(_.toString.toDouble) == expectedColStats + .max.map(_.toString.toDouble)) +assert(colStats.min.map(_.toString.toDouble) == expectedColStats + .min.map(_.toString.toDouble)) + case _ => +// other types don't have max and min stats +assert(colStats.max.isEmpty) +assert(colStats.min.isEmpty) +} +colStats.dataType match { + case BinaryType | BooleanType => assert(colStats.ndv.isEmpty) + case _ => +// ndv is an approximate value, so we make sure we have the value, and it should be +// within 3*SD's of the given rsd. +assert(colStats.ndv.get >= 0) +if (expectedColStats.ndv.get == 0) { + assert(colStats.ndv.get == 0) +} else if (expectedColStats.ndv.get > 0) { + val rsd = spark.sessionState.conf.ndvMaxError + val error = math.abs((colStats.ndv.get / expectedColStats.ndv.get.toDouble) - 1.0d) + assert(error <= rsd * 3.0d, "Error should be within 3 std. errors.") +} +} +assert(colStats.avgColLen == expectedColStats.avgColLen) +assert(colStats.maxColLen == expectedColStats.maxColLen) +assert(colStats.numTrues == expectedColStats.numTrues) +assert(colStats.numFalses == expectedColStats.numFalses) + } + + def checkTableStats(tableName: String, expectedRowCount: Option[Int]): Option[Statistics] = { +val df = sql(s"SELECT * FROM $tableName") --- End diff -- ```Scala val df = spark.table(tableName) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- ---
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79974658 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -473,15 +476,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } // construct Spark's statistics from information in Hive metastore -if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) { - val totalSize = BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get) - // TODO: we will compute "estimatedSize" when we have column stats: - // average size of row * number of rows +if (catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)).nonEmpty) { + val colStatsProps = catalogTable.properties +.filterKeys(_.startsWith(STATISTICS_BASIC_COL_STATS_PREFIX)) +.map { case (k, v) => (k.replace(STATISTICS_BASIC_COL_STATS_PREFIX, ""), v)} --- End diff -- Add a space between `)` and `}` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79974623 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val (rowCount, columnStats) = computeColStats(sparkSession, relation) + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) +} + +Seq.empty[Row] + } + + def computeColStats( + sparkSession: SparkSession, + relation: LogicalPlan): (Long, Map[String, ColumnStats]) = { + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +// Collect statistics per column. +// The first element in the result will be the overall row count, the following elements +// will be structs containing all column stats. +// The layout of each struct follows the layout of the ColumnStats. +val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError +val expressions = Count(L
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79968621 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStats +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) { + val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) + val operators = parsed.collect { +case a: AnalyzeColumnCommand => a +case o => o + } + assert(operators.size == 1) + if (operators.head.getClass != c) { +fail( + s"""$analyzeCommand expected command: $c, but got ${operators.head} + |parsed command: + |$parsed + """.stripMargin) + } +} + +val table = "table" +assertAnalyzeColumnCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +intercept[ParseException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} + } + + test("check correctness of columns") { +val table = "tbl" +val colName1 = "abc" +val colName2 = "x.yz" +val quotedColName2 = s"`$colName2`" +withTable(table) { + sql(s"CREATE TABLE $table ($colName1 int, $quotedColName2 string) USING PARQUET") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == "Invalid column name: key.") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ${colName1.toUpperCase}") +} +assert(invalidErr.message == s"Invalid column name: ${colName1.toUpperCase}.") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, colName2) +val columnStats = spark.sessionState.computeColumnStats(table, columnsToAnalyze) --- End diff -- Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79965497 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +trait StatisticsTest extends QueryTest with SharedSQLContext { + + def checkColStats( + df: DataFrame, + expectedColStatsSeq: Seq[(String, ColumnStats)]): Unit = { +val table = "tbl" +withTable(table) { + df.write.format("json").saveAsTable(table) + val columns = expectedColStatsSeq.map(_._1) + val columnStats = spark.sessionState.computeColumnStats(table, columns) --- End diff -- Change this too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79965425 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala --- @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStats +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) { + val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) + val operators = parsed.collect { +case a: AnalyzeColumnCommand => a +case o => o + } + assert(operators.size == 1) + if (operators.head.getClass != c) { +fail( + s"""$analyzeCommand expected command: $c, but got ${operators.head} + |parsed command: + |$parsed + """.stripMargin) + } +} + +val table = "table" +assertAnalyzeColumnCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +intercept[ParseException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} + } + + test("check correctness of columns") { +val table = "tbl" +val colName1 = "abc" +val colName2 = "x.yz" +val quotedColName2 = s"`$colName2`" +withTable(table) { + sql(s"CREATE TABLE $table ($colName1 int, $quotedColName2 string) USING PARQUET") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == "Invalid column name: key.") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ${colName1.toUpperCase}") +} +assert(invalidErr.message == s"Invalid column name: ${colName1.toUpperCase}.") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, colName2) +val columnStats = spark.sessionState.computeColumnStats(table, columnsToAnalyze) --- End diff -- Here, you can just replace it by ```Scala val tableIdent = TableIdentifier(table, Option("default")) val relation = spark.sessionState.catalog.lookupRelation(tableIdent) val columnStats = AnalyzeColumnCommand(tableIdent, columnsToAnalyze).computeColStats(spark, relation)._2 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79965370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala --- @@ -186,13 +187,27 @@ private[sql] class SessionState(sparkSession: SparkSession) { } /** - * Analyzes the given table in the current database to generate statistics, which will be + * Analyzes the given table in the current database to generate table-level statistics, which + * will be used in query optimizations. + */ + def analyzeTable(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = { +AnalyzeTableCommand(tableIdent, noscan).run(sparkSession) + } + + /** + * Analyzes the given columns in the table to generate column-level statistics, which will be * used in query optimizations. - * - * Right now, it only supports catalog tables and it only updates the size of a catalog table - * in the external catalog. */ - def analyze(tableName: String, noscan: Boolean = true): Unit = { -AnalyzeTableCommand(tableName, noscan).run(sparkSession) + def analyzeTableColumns(tableIdent: TableIdentifier, columnNames: Seq[String]): Unit = { +AnalyzeColumnCommand(tableIdent, columnNames).run(sparkSession) + } + + // This api is used for testing. + def computeColumnStats(tableName: String, columnNames: Seq[String]): Map[String, ColumnStats] = { --- End diff -- Avoid adding any testing-only API, if possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79926080 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the ColumnStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, i) => +(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr)) + }.toMap + + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +c
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79925839 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sessionState.conf.caseSensitiveAnalysis --- End diff -- See if [the comment here](https://github.com/apache/spark/pull/15090#discussion_r79329382) can answer your concern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79925464 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + * @param ndv Number of distinct values of the column. + */ +case class ColumnStats( +dataType: DataType, +numNulls: Long, +max: Option[Any] = None, +min: Option[Any] = None, +ndv: Option[Long] = None, +avgColLen: Option[Double] = None, +maxColLen: Option[Long] = None, +numTrues: Option[Long] = None, +numFalses: Option[Long] = None) { + + override def toString: String = "ColumnStats(" + simpleString + ")" + + def simpleString: String = { +Seq(s"numNulls=$numNulls", + if (max.isDefined) s"max=${max.get}" else "", + if (min.isDefined) s"min=${min.get}" else "", + if (ndv.isDefined) s"ndv=${ndv.get}" else "", + if (avgColLen.isDefined) s"avgColLen=${avgColLen.get}" else "", + if (maxColLen.isDefined) s"maxColLen=${maxColLen.get}" else "", + if (numTrues.isDefined) s"numTrues=${numTrues.get}" else "", + if (numFalses.isDefined) s"numFalses=${numFalses.get}" else "" +).filter(_.nonEmpty).mkString(", ") + } +} + +object ColumnStats { + def fromString(str: String, dataType: DataType): ColumnStats = { --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79925435 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + * @param ndv Number of distinct values of the column. + */ +case class ColumnStats( +dataType: DataType, +numNulls: Long, +max: Option[Any] = None, +min: Option[Any] = None, +ndv: Option[Long] = None, +avgColLen: Option[Double] = None, +maxColLen: Option[Long] = None, +numTrues: Option[Long] = None, --- End diff -- For boolean type, we don't need histograms, we can just use these two values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79924979 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + * @param ndv Number of distinct values of the column. + */ +case class ColumnStats( +dataType: DataType, +numNulls: Long, +max: Option[Any] = None, --- End diff -- Yes, but do we have a primitive numeric type is scala? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user srinathshankar commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79921689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the ColumnStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, i) => +(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr)) + }.toMap + + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user srinathshankar commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79893014 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + * @param ndv Number of distinct values of the column. + */ +case class ColumnStats( +dataType: DataType, +numNulls: Long, +max: Option[Any] = None, +min: Option[Any] = None, +ndv: Option[Long] = None, +avgColLen: Option[Double] = None, +maxColLen: Option[Long] = None, +numTrues: Option[Long] = None, +numFalses: Option[Long] = None) { + + override def toString: String = "ColumnStats(" + simpleString + ")" + + def simpleString: String = { +Seq(s"numNulls=$numNulls", + if (max.isDefined) s"max=${max.get}" else "", + if (min.isDefined) s"min=${min.get}" else "", + if (ndv.isDefined) s"ndv=${ndv.get}" else "", + if (avgColLen.isDefined) s"avgColLen=${avgColLen.get}" else "", + if (maxColLen.isDefined) s"maxColLen=${maxColLen.get}" else "", + if (numTrues.isDefined) s"numTrues=${numTrues.get}" else "", + if (numFalses.isDefined) s"numFalses=${numFalses.get}" else "" +).filter(_.nonEmpty).mkString(", ") + } +} + +object ColumnStats { + def fromString(str: String, dataType: DataType): ColumnStats = { --- End diff -- Suggestion: Based on fromString, It looks like simpleString above is practically used as a serializer to stuff stats in the catalog ? If so, could you change the name to catalogRepresentation or something like that so it's clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user srinathshankar commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79891286 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + * @param ndv Number of distinct values of the column. + */ +case class ColumnStats( +dataType: DataType, +numNulls: Long, +max: Option[Any] = None, +min: Option[Any] = None, +ndv: Option[Long] = None, +avgColLen: Option[Double] = None, +maxColLen: Option[Long] = None, +numTrues: Option[Long] = None, --- End diff -- This seems special-cased for booleans. If we're planning to maintain histograms or frequent values, it would seem that this is unnecessary --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user srinathshankar commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79897549 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sessionState.conf.caseSensitiveAnalysis --- End diff -- I think catalyst.resolver gives you a comparator based on the conf. Might be cleaner to just use that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user srinathshankar commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79893449 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it *defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. + * @param colStats Column-level statistics. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, +colStats: Map[String, ColumnStats] = Map.empty, isBroadcastable: Boolean = false) { + override def toString: String = "Statistics(" + simpleString + ")" /** Readable string representation for the Statistics. */ def simpleString: String = { Seq(s"sizeInBytes=$sizeInBytes", if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + if (colStats.nonEmpty) s"colStats=$colStats" else "", s"isBroadcastable=$isBroadcastable" -).filter(_.nonEmpty).mkString("", ", ", "") +).filter(_.nonEmpty).mkString(", ") + } +} + +/** + * Statistics for a column. + * @param ndv Number of distinct values of the column. + */ +case class ColumnStats( +dataType: DataType, +numNulls: Long, +max: Option[Any] = None, --- End diff -- Shouldn't max/min be numeric ? (You're not planning on putting strings in here, right) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79767198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation( dataSource.resolveRelation(), + expectedOutputAttributes = Some(simpleCatalogRelation.output), --- End diff -- Yeah, I'm doing this, I'll submit it tomorrow:) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79766998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation( dataSource.resolveRelation(), + expectedOutputAttributes = Some(simpleCatalogRelation.output), --- End diff -- Could you create a separate PR for this bug fix? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79766674 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -87,19 +87,28 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN - * option (other options are passed on to Hive) e.g.: + * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. + * Example SQL for analyzing table : * {{{ * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; + * ANALYZE TABLE table COMPUTE STATISTICS; --- End diff -- You can combine them by `ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79707997 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -87,19 +87,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN - * option (other options are passed on to Hive) e.g.: - * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; - * }}} --- End diff -- This is to show the syntax we support. You can check almost all the DDL, we put the syntax before the function. It can help others understand your implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79705637 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -87,19 +87,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN - * option (other options are passed on to Hive) e.g.: - * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; - * }}} --- End diff -- @gatorsmile This was a limitation explaining that we only supported analyze table with NOSCAN, now we don't have this limitation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79571121 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala --- @@ -129,3 +89,51 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend Seq.empty[Row] } } + +object AnalyzeTableCommand extends Logging { + + def calculateTotalSize(sparkSession: SparkSession, catalogTable: CatalogTable): Long = { +// This method is mainly based on +// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) +// in Hive 0.13 (except that we do not use fs.getContentSummary). +// TODO: Generalize statistics collection. +// TODO: Why fs.getContentSummary returns wrong size on Jenkins? +// Can we use fs.getContentSummary in future? +// Seems fs.getContentSummary returns wrong table size on Jenkins. So we use +// countFileSize to count the table size. +val stagingDir = +sparkSession.sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") --- End diff -- FYI - there is an Indentation mistake here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79550951 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsTest.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStats, Statistics} +import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, AnalyzeTableCommand} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +trait StatisticsTest extends QueryTest with TestHiveSingleton with SQLTestUtils { + + def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { +val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) +val operators = parsed.collect { + case a: AnalyzeTableCommand => a + case b: AnalyzeColumnCommand => b + case o => o --- End diff -- ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79550718 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsTest.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStats, Statistics} +import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, AnalyzeTableCommand} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +trait StatisticsTest extends QueryTest with TestHiveSingleton with SQLTestUtils { + + def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { +val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) +val operators = parsed.collect { + case a: AnalyzeTableCommand => a + case b: AnalyzeColumnCommand => b + case o => o +} + +assert(operators.size === 1) +if (operators(0).getClass() != c) { + fail( +s"""$analyzeCommand expected command: $c, but got ${operators(0)} + |parsed command: + |$parsed + """.stripMargin) --- End diff -- Please update the syntax. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79547634 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the ColumnStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, i) => +(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr)) + }.toMap + + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics)))
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79547296 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the ColumnStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, i) => +(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr)) + }.toMap + + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics)))
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79546838 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the ColumnStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, i) => +(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr)) + }.toMap + + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics)))
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79546259 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness of column names +val attributesToAnalyze = mutable.MutableList[Attribute]() +val caseSensitive = sessionState.conf.caseSensitiveAnalysis +columnNames.foreach { col => + val exprOption = relation.output.find { attr => +if (caseSensitive) attr.name == col else attr.name.equalsIgnoreCase(col) + } + val expr = exprOption.getOrElse(throw new AnalysisException(s"Invalid column name: $col.")) + // do deduplication + if (!attributesToAnalyze.contains(expr)) { +attributesToAnalyze += expr + } +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the ColumnStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, i) => +(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr)) + }.toMap + + val statistics = Statistics( +sizeInBytes = newTotalSize, +rowCount = Some(rowCount), +colStats = columnStats ++ catalogTable.stats.map(_.colStats).getOrElse(Map())) + sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics)))
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79544029 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStats +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +val table = "table" +assertAnalyzeCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} + } + + test("check correctness of columns") { +val table = "tbl" +val quotedColumn = "x.yz" +val quotedName = s"`$quotedColumn`" +withTable(table) { + sql(s"CREATE TABLE $table (abc int, $quotedName string)") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == s"Invalid column name: key.") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ABC") +} +assert(invalidErr.message == s"Invalid column name: ABC.") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ${quotedName.toUpperCase}, " + + s"ABC, $quotedName") +val df = sql(s"SELECT * FROM $table") +val stats = df.queryExecution.analyzed.collect { + case rel: MetastoreRelation => +val colStats = rel.catalogTable.stats.get.colStats +// check deduplication +assert(colStats.size == 2) +assert(colStats.contains(quotedColumn)) +assert(colStats.contains("abc")) +} +assert(stats.size == 1) + } +} + } + + private def getNonNullValues[T](values: Seq[Option[T]]): Seq[T] = { +values.filter(_.isDefined).map(_.get) + } + + test("column-level statistics for integral type columns") { +val values = (0 to 5).map { i => + if (i % 2 == 0) None else Some(i) +} +val data = values.map { i => + (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong)) +} + +val df = data.toDF("c1", "c2", "c3", "c4") +val nonNullValues = getNonNullValues[Int](values) +val statsSeq = df.schema.map { f => + val colStats = ColumnStats( +dataType = f.dataType, +numNulls = values.count(_.isEmpty), +max = Some(nonNullValues.max), +min = Some(nonNullValues.min), +ndv = Some(nonNullValues.distinct.length.toLong)) + (f.name, colStats) +} +checkColStats(df, statsSeq) + } + + test("column-level statistics for fractional type columns") { +val values = (0 to 5).map { i => + if (i == 0) None else Some(i + i * 0.01d) +} +val data = values.map { i => + (i.map(_.toFloat), i.map(_.toDouble), i.map(Decimal(_))) +} + +val df = data.toDF("c1", "c2", "c3") +val nonNullValues = getNonNullValues[Double](values) +val statsSeq = df.schema.map { f => + val colStats = ColumnStats( +dataType = f.dataType, +
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79543613 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStats +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +val table = "table" +assertAnalyzeCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} + } + + test("check correctness of columns") { +val table = "tbl" +val quotedColumn = "x.yz" +val quotedName = s"`$quotedColumn`" +withTable(table) { + sql(s"CREATE TABLE $table (abc int, $quotedName string)") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == s"Invalid column name: key.") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ABC") +} +assert(invalidErr.message == s"Invalid column name: ABC.") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ${quotedName.toUpperCase}, " + + s"ABC, $quotedName") +val df = sql(s"SELECT * FROM $table") +val stats = df.queryExecution.analyzed.collect { + case rel: MetastoreRelation => +val colStats = rel.catalogTable.stats.get.colStats +// check deduplication +assert(colStats.size == 2) +assert(colStats.contains(quotedColumn)) +assert(colStats.contains("abc")) +} +assert(stats.size == 1) + } +} + } + + private def getNonNullValues[T](values: Seq[Option[T]]): Seq[T] = { +values.filter(_.isDefined).map(_.get) + } + + test("column-level statistics for integral type columns") { +val values = (0 to 5).map { i => + if (i % 2 == 0) None else Some(i) +} +val data = values.map { i => + (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong)) +} + +val df = data.toDF("c1", "c2", "c3", "c4") +val nonNullValues = getNonNullValues[Int](values) +val statsSeq = df.schema.map { f => --- End diff -- Please rename all the `statsSeq` to `expectedColStats` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79543159 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStats +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +val table = "table" +assertAnalyzeCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} + } + + test("check correctness of columns") { +val table = "tbl" +val quotedColumn = "x.yz" +val quotedName = s"`$quotedColumn`" +withTable(table) { + sql(s"CREATE TABLE $table (abc int, $quotedName string)") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == s"Invalid column name: key.") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ABC") +} +assert(invalidErr.message == s"Invalid column name: ABC.") --- End diff -- A useless `s` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79543164 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStats +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +val table = "table" +assertAnalyzeCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} + } + + test("check correctness of columns") { +val table = "tbl" +val quotedColumn = "x.yz" +val quotedName = s"`$quotedColumn`" +withTable(table) { + sql(s"CREATE TABLE $table (abc int, $quotedName string)") + + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key") + } + assert(invalidColError.message == s"Invalid column name: key.") --- End diff -- A useless `s` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79543008 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.ColumnStats +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + import testImplicits._ + + test("parse analyze column commands") { +val table = "table" +assertAnalyzeCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +intercept[AnalysisException] { --- End diff -- This is a `ParseException`, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79542607 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -87,19 +87,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN - * option (other options are passed on to Hive) e.g.: - * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; - * }}} --- End diff -- Any reason why you removed it? Since this is out-dated, you just need to update it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79521372 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.plans.logical.BasicColStats +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + + test("parse analyze column commands") { +val table = "table" +assertAnalyzeCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +val noColumnError = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} +assert(noColumnError.message == "Need to specify the columns to analyze. Usage: " + + "ANALYZE TABLE tbl COMPUTE STATISTICS FOR COLUMNS key, value") + +withTable(table) { + sql(s"CREATE TABLE $table (key INT, value STRING)") + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS k") + } + assert(invalidColError.message == s"Invalid column name: k") + + val duplicateColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value, key") + } + assert(duplicateColError.message == s"Duplicate column name: key") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS keY") +} +assert(invalidErr.message == s"Invalid column name: keY") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +val duplicateErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value, vaLue") +} +assert(duplicateErr.message == s"Duplicate column name: vaLue") + } +} + } + + test("basic statistics for integral type columns") { +val rdd = sparkContext.parallelize(Seq("1", null, "2", "3", null)).map { i => + if (i != null) Row(i.toByte, i.toShort, i.toInt, i.toLong) else Row(i, i, i, i) --- End diff -- Cool, please add some salt to this when you fix (as I don't think mine is perfect anyway :)). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79520564 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.plans.logical.BasicColStats +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand +import org.apache.spark.sql.types._ + +class StatisticsColumnSuite extends StatisticsTest { + + test("parse analyze column commands") { +val table = "table" +assertAnalyzeCommand( + s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value", + classOf[AnalyzeColumnCommand]) + +val noColumnError = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS") +} +assert(noColumnError.message == "Need to specify the columns to analyze. Usage: " + + "ANALYZE TABLE tbl COMPUTE STATISTICS FOR COLUMNS key, value") + +withTable(table) { + sql(s"CREATE TABLE $table (key INT, value STRING)") + val invalidColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS k") + } + assert(invalidColError.message == s"Invalid column name: k") + + val duplicateColError = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value, key") + } + assert(duplicateColError.message == s"Duplicate column name: key") + + withSQLConf("spark.sql.caseSensitive" -> "true") { +val invalidErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS keY") +} +assert(invalidErr.message == s"Invalid column name: keY") + } + + withSQLConf("spark.sql.caseSensitive" -> "false") { +val duplicateErr = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value, vaLue") +} +assert(duplicateErr.message == s"Duplicate column name: vaLue") + } +} + } + + test("basic statistics for integral type columns") { +val rdd = sparkContext.parallelize(Seq("1", null, "2", "3", null)).map { i => + if (i != null) Row(i.toByte, i.toShort, i.toInt, i.toLong) else Row(i, i, i, i) --- End diff -- @HyukjinKwon Seems better. Let me change the code based on this. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79508043 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, BasicColStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness of column names +val validColumns = mutable.MutableList[NamedExpression]() +val resolver = sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.resolve(col.split("\\."), resolver) + if (exprOption.isEmpty) { +throw new AnalysisException(s"Invalid column name: $col") + } + if (validColumns.map(_.exprId).contains(exprOption.get.exprId)) { +throw new AnalysisException(s"Duplicate column name: $col") + } + validColumns += exprOption.get +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the BasicColStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +validColumns.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val colStats = validColumns.zipWithIndex.map { case (expr, i) => +val colInfo = statsRow.getStruct(i + 1, ColumnStatsStruct.statsNumber) +val colStats = ColumnStatsStruct.unwrapRow(expr, colInfo) +(expr.name, colStats) + }.toMap + + val statistics = +Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount), basicColStats = colStats) --- End diff -- For the "all or nothing" statistics approach, we can better maintain
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79502791 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, BasicColStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness of column names +val validColumns = mutable.MutableList[NamedExpression]() +val resolver = sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.resolve(col.split("\\."), resolver) + if (exprOption.isEmpty) { +throw new AnalysisException(s"Invalid column name: $col") + } + if (validColumns.map(_.exprId).contains(exprOption.get.exprId)) { +throw new AnalysisException(s"Duplicate column name: $col") + } + validColumns += exprOption.get +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the BasicColStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +validColumns.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val colStats = validColumns.zipWithIndex.map { case (expr, i) => +val colInfo = statsRow.getStruct(i + 1, ColumnStatsStruct.statsNumber) +val colStats = ColumnStatsStruct.unwrapRow(expr, colInfo) +(expr.name, colStats) + }.toMap + + val statistics = +Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount), basicColStats = colStats) --- End diff -- @hvanhovell Agree. If we want to support refresh column stats indepe
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79496929 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, BasicColStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness of column names +val validColumns = mutable.MutableList[NamedExpression]() +val resolver = sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.resolve(col.split("\\."), resolver) + if (exprOption.isEmpty) { +throw new AnalysisException(s"Invalid column name: $col") + } + if (validColumns.map(_.exprId).contains(exprOption.get.exprId)) { +throw new AnalysisException(s"Duplicate column name: $col") --- End diff -- Agree. Just do not forget to add a test case for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org