Repository: spark
Updated Branches:
  refs/heads/master cac9b1dea -> 6ea582e36


[SPARK-24181][SQL] Better error message for writing sorted data

## What changes were proposed in this pull request?

The exception message should clearly distinguish sorting and bucketing in 
`save` and `jdbc` write.

When a user tries to write a sorted data using save or insertInto, it will 
throw an exception with message that `s"'$operation' does not support bucketing 
right now""`.

We should throw `s"'$operation' does not support sortBy right now""` instead.

## How was this patch tested?

More tests in `DataFrameReaderWriterSuite.scala`

Author: DB Tsai <d_t...@apple.com>

Closes #21235 from dbtsai/fixException.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ea582e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ea582e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ea582e3

Branch: refs/heads/master
Commit: 6ea582e36ab0a2e4e01340f6fc8cfb8d493d567d
Parents: cac9b1d
Author: DB Tsai <d_t...@apple.com>
Authored: Wed May 9 09:15:16 2018 -0700
Committer: DB Tsai <d_t...@apple.com>
Committed: Wed May 9 09:15:16 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 12 ++++++---
 .../spark/sql/sources/BucketedWriteSuite.scala  | 27 +++++++++++++++++---
 .../sql/test/DataFrameReaderWriterSuite.scala   | 16 ++++++++++--
 3 files changed, 46 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ea582e3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index e183fa6..90bea2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -330,8 +330,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   }
 
   private def getBucketSpec: Option[BucketSpec] = {
-    if (sortColumnNames.isDefined) {
-      require(numBuckets.isDefined, "sortBy must be used together with 
bucketBy")
+    if (sortColumnNames.isDefined && numBuckets.isEmpty) {
+      throw new AnalysisException("sortBy must be used together with bucketBy")
     }
 
     numBuckets.map { n =>
@@ -340,8 +340,12 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   }
 
   private def assertNotBucketed(operation: String): Unit = {
-    if (numBuckets.isDefined || sortColumnNames.isDefined) {
-      throw new AnalysisException(s"'$operation' does not support bucketing 
right now")
+    if (getBucketSpec.isDefined) {
+      if (sortColumnNames.isEmpty) {
+        throw new AnalysisException(s"'$operation' does not support bucketBy 
right now")
+      } else {
+        throw new AnalysisException(s"'$operation' does not support bucketBy 
and sortBy right now")
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ea582e3/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 93f3efe..5ff1ea8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -60,7 +60,10 @@ abstract class BucketedWriteSuite extends QueryTest with 
SQLTestUtils {
 
   test("specify sorting columns without bucketing columns") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
-    intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt"))
+    val e = intercept[AnalysisException] {
+      df.write.sortBy("j").saveAsTable("tt")
+    }
+    assert(e.getMessage == "sortBy must be used together with bucketBy;")
   }
 
   test("sorting by non-orderable column") {
@@ -74,7 +77,16 @@ abstract class BucketedWriteSuite extends QueryTest with 
SQLTestUtils {
     val e = intercept[AnalysisException] {
       df.write.bucketBy(2, "i").parquet("/tmp/path")
     }
-    assert(e.getMessage == "'save' does not support bucketing right now;")
+    assert(e.getMessage == "'save' does not support bucketBy right now;")
+  }
+
+  test("write bucketed and sorted data using save()") {
+    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+
+    val e = intercept[AnalysisException] {
+      df.write.bucketBy(2, "i").sortBy("i").parquet("/tmp/path")
+    }
+    assert(e.getMessage == "'save' does not support bucketBy and sortBy right 
now;")
   }
 
   test("write bucketed data using insertInto()") {
@@ -83,7 +95,16 @@ abstract class BucketedWriteSuite extends QueryTest with 
SQLTestUtils {
     val e = intercept[AnalysisException] {
       df.write.bucketBy(2, "i").insertInto("tt")
     }
-    assert(e.getMessage == "'insertInto' does not support bucketing right 
now;")
+    assert(e.getMessage == "'insertInto' does not support bucketBy right now;")
+  }
+
+  test("write bucketed and sorted data using insertInto()") {
+    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+
+    val e = intercept[AnalysisException] {
+      df.write.bucketBy(2, "i").sortBy("i").insertInto("tt")
+    }
+    assert(e.getMessage == "'insertInto' does not support bucketBy and sortBy 
right now;")
   }
 
   private lazy val df = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6ea582e3/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 14b1feb..b65058f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -276,7 +276,7 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
     assert(LastOptions.parameters("doubleOpt") == "6.7")
   }
 
-  test("check jdbc() does not support partitioning or bucketing") {
+  test("check jdbc() does not support partitioning, bucketBy or sortBy") {
     val df = spark.read.text(Utils.createTempDir(namePrefix = 
"text").getCanonicalPath)
 
     var w = df.write.partitionBy("value")
@@ -287,7 +287,19 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
 
     w = df.write.bucketBy(2, "value")
     e = intercept[AnalysisException](w.jdbc(null, null, null))
-    Seq("jdbc", "bucketing").foreach { s =>
+    Seq("jdbc", "does not support bucketBy right now").foreach { s =>
+      
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
+    }
+
+    w = df.write.sortBy("value")
+    e = intercept[AnalysisException](w.jdbc(null, null, null))
+    Seq("sortBy must be used together with bucketBy").foreach { s =>
+      
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
+    }
+
+    w = df.write.bucketBy(2, "value").sortBy("value")
+    e = intercept[AnalysisException](w.jdbc(null, null, null))
+    Seq("jdbc", "does not support bucketBy and sortBy right now").foreach { s 
=>
       
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to