spark git commit: [SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values

2018-08-13 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 b9b35b959 -> 787790b3c


[SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values

## What changes were proposed in this pull request?

`ANALYZE TABLE ... PARTITION(...) COMPUTE STATISTICS` can fail with a NPE if a 
partition column contains a NULL value.

The PR avoids the NPE, replacing the `NULL` values with the default partition 
placeholder.

## How was this patch tested?

added UT

Closes #22036 from mgaido91/SPARK-25028.

Authored-by: Marco Gaido 
Signed-off-by: Wenchen Fan 
(cherry picked from commit c220cc42abebbc98a6110b50f787eb6d338c2d97)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/787790b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/787790b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/787790b3

Branch: refs/heads/branch-2.3
Commit: 787790b3c733085b8b5e95cf832dedd481ab3b9a
Parents: b9b35b9
Author: Marco Gaido 
Authored: Tue Aug 14 00:59:18 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Aug 14 00:59:54 2018 +0800

--
 .../command/AnalyzePartitionCommand.scala | 10 --
 .../spark/sql/StatisticsCollectionSuite.scala | 18 ++
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/787790b3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index 5b54b22..18fefa0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -140,7 +140,13 @@ case class AnalyzePartitionCommand(
 val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: 
_*).count()
 
 df.collect().map { r =>
-  val partitionColumnValues = 
partitionColumns.indices.map(r.get(_).toString)
+  val partitionColumnValues = partitionColumns.indices.map { i =>
+if (r.isNullAt(i)) {
+  ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+} else {
+  r.get(i).toString
+}
+  }
   val spec = 
tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
   val count = BigInt(r.getLong(partitionColumns.size))
   (spec, count)

http://git-wip-us.apache.org/repos/asf/spark/blob/787790b3/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index b11e798..0e7209a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -198,6 +198,24 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
 }
   }
 
+  test("SPARK-25028: column stats collection for null partitioning columns") {
+val table = "analyze_partition_with_null"
+withTempDir { dir =>
+  withTable(table) {
+sql(s"""
+ |CREATE TABLE $table (value string, name string)
+ |USING PARQUET
+ |PARTITIONED BY (name)
+ |LOCATION '${dir.toURI}'""".stripMargin)
+val df = Seq(("a", null), ("b", null)).toDF("value", "name")
+df.write.mode("overwrite").insertInto(table)
+sql(s"ANALYZE TABLE $table PARTITION (name) COMPUTE STATISTICS")
+val partitions = 
spark.sessionState.catalog.listPartitions(TableIdentifier(table))
+assert(partitions.head.stats.get.rowCount.get == 2)
+  }
+}
+  }
+
   test("number format in statistics") {
 val numbers = Seq(
   

spark git commit: [SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values

2018-08-13 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master b804ca577 -> c220cc42a


[SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values

## What changes were proposed in this pull request?

`ANALYZE TABLE ... PARTITION(...) COMPUTE STATISTICS` can fail with a NPE if a 
partition column contains a NULL value.

The PR avoids the NPE, replacing the `NULL` values with the default partition 
placeholder.

## How was this patch tested?

added UT

Closes #22036 from mgaido91/SPARK-25028.

Authored-by: Marco Gaido 
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c220cc42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c220cc42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c220cc42

Branch: refs/heads/master
Commit: c220cc42abebbc98a6110b50f787eb6d338c2d97
Parents: b804ca5
Author: Marco Gaido 
Authored: Tue Aug 14 00:59:18 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Aug 14 00:59:18 2018 +0800

--
 .../command/AnalyzePartitionCommand.scala | 10 --
 .../spark/sql/StatisticsCollectionSuite.scala | 18 ++
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c220cc42/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index 5b54b22..18fefa0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -140,7 +140,13 @@ case class AnalyzePartitionCommand(
 val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: 
_*).count()
 
 df.collect().map { r =>
-  val partitionColumnValues = 
partitionColumns.indices.map(r.get(_).toString)
+  val partitionColumnValues = partitionColumns.indices.map { i =>
+if (r.isNullAt(i)) {
+  ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+} else {
+  r.get(i).toString
+}
+  }
   val spec = 
tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
   val count = BigInt(r.getLong(partitionColumns.size))
   (spec, count)

http://git-wip-us.apache.org/repos/asf/spark/blob/c220cc42/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 60fa951..cb562d6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -204,6 +204,24 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
 }
   }
 
+  test("SPARK-25028: column stats collection for null partitioning columns") {
+val table = "analyze_partition_with_null"
+withTempDir { dir =>
+  withTable(table) {
+sql(s"""
+ |CREATE TABLE $table (value string, name string)
+ |USING PARQUET
+ |PARTITIONED BY (name)
+ |LOCATION '${dir.toURI}'""".stripMargin)
+val df = Seq(("a", null), ("b", null)).toDF("value", "name")
+df.write.mode("overwrite").insertInto(table)
+sql(s"ANALYZE TABLE $table PARTITION (name) COMPUTE STATISTICS")
+val partitions = 
spark.sessionState.catalog.listPartitions(TableIdentifier(table))
+assert(partitions.head.stats.get.rowCount.get == 2)
+  }
+}
+  }
+
   test("number format in statistics") {
 val numbers = Seq(
   BigInt(0) -> (("0.0 B", "0")),


-
To