spark git commit: [SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values
Repository: spark Updated Branches: refs/heads/branch-2.3 b9b35b959 -> 787790b3c [SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values ## What changes were proposed in this pull request? `ANALYZE TABLE ... PARTITION(...) COMPUTE STATISTICS` can fail with a NPE if a partition column contains a NULL value. The PR avoids the NPE, replacing the `NULL` values with the default partition placeholder. ## How was this patch tested? added UT Closes #22036 from mgaido91/SPARK-25028. Authored-by: Marco Gaido Signed-off-by: Wenchen Fan (cherry picked from commit c220cc42abebbc98a6110b50f787eb6d338c2d97) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/787790b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/787790b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/787790b3 Branch: refs/heads/branch-2.3 Commit: 787790b3c733085b8b5e95cf832dedd481ab3b9a Parents: b9b35b9 Author: Marco Gaido Authored: Tue Aug 14 00:59:18 2018 +0800 Committer: Wenchen Fan Committed: Tue Aug 14 00:59:54 2018 +0800 -- .../command/AnalyzePartitionCommand.scala | 10 -- .../spark/sql/StatisticsCollectionSuite.scala | 18 ++ 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/787790b3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 5b54b22..18fefa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} import org.apache.spark.sql.execution.datasources.PartitioningUtils @@ -140,7 +140,13 @@ case class AnalyzePartitionCommand( val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count() df.collect().map { r => - val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString) + val partitionColumnValues = partitionColumns.indices.map { i => +if (r.isNullAt(i)) { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME +} else { + r.get(i).toString +} + } val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap val count = BigInt(r.getLong(partitionColumns.size)) (spec, count) http://git-wip-us.apache.org/repos/asf/spark/blob/787790b3/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index b11e798..0e7209a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -198,6 +198,24 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("SPARK-25028: column stats collection for null partitioning columns") { +val table = "analyze_partition_with_null" +withTempDir { dir => + withTable(table) { +sql(s""" + |CREATE TABLE $table (value string, name string) + |USING PARQUET + |PARTITIONED BY (name) + |LOCATION '${dir.toURI}'""".stripMargin) +val df = Seq(("a", null), ("b", null)).toDF("value", "name") +df.write.mode("overwrite").insertInto(table) +sql(s"ANALYZE TABLE $table PARTITION (name) COMPUTE STATISTICS") +val partitions = spark.sessionState.catalog.listPartitions(TableIdentifier(table)) +assert(partitions.head.stats.get.rowCount.get == 2) + } +} + } + test("number format in statistics") { val numbers = Seq(
spark git commit: [SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values
Repository: spark Updated Branches: refs/heads/master b804ca577 -> c220cc42a [SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values ## What changes were proposed in this pull request? `ANALYZE TABLE ... PARTITION(...) COMPUTE STATISTICS` can fail with a NPE if a partition column contains a NULL value. The PR avoids the NPE, replacing the `NULL` values with the default partition placeholder. ## How was this patch tested? added UT Closes #22036 from mgaido91/SPARK-25028. Authored-by: Marco Gaido Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c220cc42 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c220cc42 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c220cc42 Branch: refs/heads/master Commit: c220cc42abebbc98a6110b50f787eb6d338c2d97 Parents: b804ca5 Author: Marco Gaido Authored: Tue Aug 14 00:59:18 2018 +0800 Committer: Wenchen Fan Committed: Tue Aug 14 00:59:18 2018 +0800 -- .../command/AnalyzePartitionCommand.scala | 10 -- .../spark/sql/StatisticsCollectionSuite.scala | 18 ++ 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c220cc42/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 5b54b22..18fefa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} import org.apache.spark.sql.execution.datasources.PartitioningUtils @@ -140,7 +140,13 @@ case class AnalyzePartitionCommand( val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count() df.collect().map { r => - val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString) + val partitionColumnValues = partitionColumns.indices.map { i => +if (r.isNullAt(i)) { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME +} else { + r.get(i).toString +} + } val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap val count = BigInt(r.getLong(partitionColumns.size)) (spec, count) http://git-wip-us.apache.org/repos/asf/spark/blob/c220cc42/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 60fa951..cb562d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -204,6 +204,24 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("SPARK-25028: column stats collection for null partitioning columns") { +val table = "analyze_partition_with_null" +withTempDir { dir => + withTable(table) { +sql(s""" + |CREATE TABLE $table (value string, name string) + |USING PARQUET + |PARTITIONED BY (name) + |LOCATION '${dir.toURI}'""".stripMargin) +val df = Seq(("a", null), ("b", null)).toDF("value", "name") +df.write.mode("overwrite").insertInto(table) +sql(s"ANALYZE TABLE $table PARTITION (name) COMPUTE STATISTICS") +val partitions = spark.sessionState.catalog.listPartitions(TableIdentifier(table)) +assert(partitions.head.stats.get.rowCount.get == 2) + } +} + } + test("number format in statistics") { val numbers = Seq( BigInt(0) -> (("0.0 B", "0")), - To