Repository: spark
Updated Branches:
  refs/heads/master 09fcf96b8 -> cc465fd92


[SPARK-8138] [SQL] Improves error message when conflicting partition columns 
are found

This PR improves the error message shown when conflicting partition column 
names are detected.  This can be particularly annoying and confusing when there 
are a large number of partitions while a handful of them happened to contain 
unexpected temporary file(s).  Now all suspicious directories are listed as 
below:

```
java.lang.AssertionError: assertion failed: Conflicting partition column names 
detected:

        Partition column name list #0: b, c, d
        Partition column name list #1: b, c
        Partition column name list #2: b

For partitioned table directories, data files should only live in leaf 
directories. Please check the following directories for unexpected files:

        file:/tmp/foo/b=0
        file:/tmp/foo/b=1
        file:/tmp/foo/b=1/c=1
        file:/tmp/foo/b=0/c=0
```

Author: Cheng Lian <l...@databricks.com>

Closes #6610 from liancheng/part-errmsg and squashes the following commits:

7d05f2c [Cheng Lian] Fixes Scala style issue
a149250 [Cheng Lian] Adds test case for the error message
6b74dd8 [Cheng Lian] Also lists suspicious non-leaf partition directories
a935eb8 [Cheng Lian] Improves error message when conflicting partition columns 
are found


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc465fd9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc465fd9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc465fd9

Branch: refs/heads/master
Commit: cc465fd92482737c21971d82e30d4cf247acf932
Parents: 09fcf96
Author: Cheng Lian <l...@databricks.com>
Authored: Wed Jun 24 02:17:12 2015 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Jun 24 02:17:12 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/sources/PartitioningUtils.scala   | 47 +++++++++++++++-----
 .../ParquetPartitionDiscoverySuite.scala        | 45 +++++++++++++++++++
 2 files changed, 82 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc465fd9/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index c6f535d..8b2a45d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
     } else {
       // This dataset is partitioned. We need to check whether all partitions 
have the same
       // partition columns and resolve potential type conflicts.
-      val resolvedPartitionValues = 
resolvePartitions(pathsWithPartitionValues.map(_._2))
+      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
 
       // Creates the StructType which represents the partition columns.
       val fields = {
@@ -181,19 +181,18 @@ private[sql] object PartitioningUtils {
    *   StringType
    * }}}
    */
-  private[sql] def resolvePartitions(values: Seq[PartitionValues]): 
Seq[PartitionValues] = {
-    // Column names of all partitions must match
-    val distinctPartitionsColNames = values.map(_.columnNames).distinct
-
-    if (distinctPartitionsColNames.isEmpty) {
+  private[sql] def resolvePartitions(
+      pathsWithPartitionValues: Seq[(Path, PartitionValues)]): 
Seq[PartitionValues] = {
+    if (pathsWithPartitionValues.isEmpty) {
       Seq.empty
     } else {
-      assert(distinctPartitionsColNames.size == 1, {
-        val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
-        s"Conflicting partition column names detected:\n$list"
-      })
+      val distinctPartColNames = 
pathsWithPartitionValues.map(_._2.columnNames).distinct
+      assert(
+        distinctPartColNames.size == 1,
+        listConflictingPartitionColumns(pathsWithPartitionValues))
 
       // Resolves possible type conflicts for each column
+      val values = pathsWithPartitionValues.map(_._2)
       val columnCount = values.head.columnNames.size
       val resolvedValues = (0 until columnCount).map { i =>
         resolveTypeConflicts(values.map(_.literals(i)))
@@ -206,6 +205,34 @@ private[sql] object PartitioningUtils {
     }
   }
 
+  private[sql] def listConflictingPartitionColumns(
+      pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+    val distinctPartColNames = 
pathWithPartitionValues.map(_._2.columnNames).distinct
+
+    def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
+      seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) 
=> value })
+
+    val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
+      case (path, partValues) => partValues.columnNames -> path
+    })
+
+    val distinctPartColLists = distinctPartColNames.map(_.mkString(", 
")).zipWithIndex.map {
+      case (names, index) =>
+        s"Partition column name list #$index: $names"
+    }
+
+    // Lists out those non-leaf partition directories that also contain files
+    val suspiciousPaths = 
distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
+
+    s"Conflicting partition column names detected:\n" +
+      distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
+      "For partitioned table directories, data files should only live in leaf 
directories.\n" +
+      "And directories at the same level should have the same partition column 
name.\n" +
+      "Please check the following directories for unexpected files or " +
+      "inconsistent partition column names:\n" +
+      suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
+  }
+
   /**
    * Converts a string to a [[Literal]] with automatic type inference.  
Currently only supports
    * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], 
and

http://git-wip-us.apache.org/repos/asf/spark/blob/cc465fd9/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 01df189..d0ebb11 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest {
       
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
     }
   }
+
+  test("listConflictingPartitionColumns") {
+    def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): 
String = {
+      val conflictingColNameLists = colNameLists.zipWithIndex.map { case 
(list, index) =>
+        s"\tPartition column name list #$index: $list"
+      }.mkString("\n", "\n", "\n")
+
+      // scalastyle:off
+      s"""Conflicting partition column names detected:
+         |$conflictingColNameLists
+         |For partitioned table directories, data files should only live in 
leaf directories.
+         |And directories at the same level should have the same partition 
column name.
+         |Please check the following directories for unexpected files or 
inconsistent partition column names:
+         |${paths.map("\t" + _).mkString("\n", "\n", "")}
+       """.stripMargin.trim
+      // scalastyle:on
+    }
+
+    assert(
+      listConflictingPartitionColumns(
+        Seq(
+          (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), 
Seq(Literal(1)))),
+          (new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), 
Seq(Literal(1)))))).trim ===
+        makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", 
"file:/tmp/foo/b=1")))
+
+    assert(
+      listConflictingPartitionColumns(
+        Seq(
+          (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), 
Seq(Literal(1)))),
+          (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), 
Seq(Literal(1)))))).trim ===
+        makeExpectedMessage(
+          Seq("a"),
+          Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
+
+    assert(
+      listConflictingPartitionColumns(
+        Seq(
+          (new Path("file:/tmp/foo/a=1"),
+            PartitionValues(Seq("a"), Seq(Literal(1)))),
+          (new Path("file:/tmp/foo/a=1/b=foo"),
+            PartitionValues(Seq("a", "b"), Seq(Literal(1), 
Literal("foo")))))).trim ===
+        makeExpectedMessage(
+          Seq("a", "a, b"),
+          Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to