Repository: spark Updated Branches: refs/heads/master 8487902a9 -> f041e55ee
[SPARK-19618][SQL] Inconsistency wrt max. buckets allowed from Dataframe API vs SQL ## What changes were proposed in this pull request? Jira: https://issues.apache.org/jira/browse/SPARK-19618 Moved the check for validating number of buckets from `DataFrameWriter` to `BucketSpec` creation ## How was this patch tested? - Added more unit tests Author: Tejas Patil <tej...@fb.com> Closes #16948 from tejasapatil/SPARK-19618_max_buckets. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f041e55e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f041e55e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f041e55e Branch: refs/heads/master Commit: f041e55eefe1d8a995fed321c66bccbd8b8e5255 Parents: 8487902 Author: Tejas Patil <tej...@fb.com> Authored: Wed Feb 15 22:45:58 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Feb 15 22:45:58 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/catalog/interface.scala | 5 ++-- .../org/apache/spark/sql/DataFrameWriter.scala | 1 - .../sql/sources/CreateTableAsSelectSuite.scala | 28 +++++++++++--------- .../spark/sql/sources/BucketedWriteSuite.scala | 10 ++++--- 4 files changed, 25 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f041e55e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 353e595..2b3b575 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -135,8 +135,9 @@ case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], sortColumnNames: Seq[String]) { - if (numBuckets <= 0) { - throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") + if (numBuckets <= 0 || numBuckets >= 100000) { + throw new AnalysisException( + s"Number of buckets should be greater than 0 but less than 100000. Got `$numBuckets`") } override def toString: String = { http://git-wip-us.apache.org/repos/asf/spark/blob/f041e55e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1d834b1..cdae8ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -275,7 +275,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } numBuckets.map { n => - require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.") BucketSpec(n, bucketColumnNames.get, sortColumnNames.getOrElse(Nil)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f041e55e/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 99da196..4a42f8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -206,7 +206,7 @@ class CreateTableAsSelectSuite } } - test("create table using as select - with non-zero buckets") { + test("create table using as select - with valid number of buckets") { val catalog = spark.sessionState.catalog withTable("t") { sql( @@ -222,19 +222,21 @@ class CreateTableAsSelectSuite } } - test("create table using as select - with zero buckets") { + test("create table using as select - with invalid number of buckets") { withTable("t") { - val e = intercept[AnalysisException] { - sql( - s""" - |CREATE TABLE t USING PARQUET - |OPTIONS (PATH '${path.toURI}') - |CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS - |AS SELECT 1 AS a, 2 AS b - """.stripMargin - ) - }.getMessage - assert(e.contains("Expected positive number of buckets, but got `0`")) + Seq(0, 100000).foreach(numBuckets => { + val e = intercept[AnalysisException] { + sql( + s""" + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '${path.toURI}') + |CLUSTERED BY (a) SORTED BY (b) INTO $numBuckets BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin + ) + }.getMessage + assert(e.contains("Number of buckets should be greater than 0 but less than 100000")) + }) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f041e55e/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 8528dfc..61cef2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -38,10 +38,14 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt")) } - test("numBuckets not greater than 0 or less than 100000") { + test("numBuckets be greater than 0 but less than 100000") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - intercept[IllegalArgumentException](df.write.bucketBy(0, "i").saveAsTable("tt")) - intercept[IllegalArgumentException](df.write.bucketBy(100000, "i").saveAsTable("tt")) + + Seq(-1, 0, 100000).foreach(numBuckets => { + val e = intercept[AnalysisException](df.write.bucketBy(numBuckets, "i").saveAsTable("tt")) + assert( + e.getMessage.contains("Number of buckets should be greater than 0 but less than 100000")) + }) } test("specify sorting columns without bucketing columns") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org