This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c6dccc7dd41 [SPARK-39163][SQL] Throw an exception w/ error class for 
an invalid bucket file
c6dccc7dd41 is described below

commit c6dccc7dd412a95007f5bb2584d69b85ff9ebf8e
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Thu May 19 20:39:35 2022 +0300

    [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket 
file
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to use the INVALID_BUCKET_FILE error classes for an 
invalid bucket file.
    
    ### Why are the changes needed?
    Porting the executing errors for multiple rows from a subquery used as an 
expression to the new error framework should improve user experience with Spark 
SQL.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #36603 from panbingkun/SPARK-39163.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   |  3 +++
 .../spark/sql/errors/QueryExecutionErrors.scala    |  5 ++++
 .../spark/sql/execution/DataSourceScanExec.scala   |  4 ++--
 .../sql/errors/QueryExecutionErrorsSuite.scala     | 28 ++++++++++++++++++++--
 .../adaptive/AdaptiveQueryExecSuite.scala          |  6 ++---
 .../spark/sql/sources/BucketedReadSuite.scala      | 23 ------------------
 6 files changed, 38 insertions(+), 31 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index e4ee09ea8a7..1a139c018e8 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -115,6 +115,9 @@
   "INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : {
     "message" : [ "The index <indexValue> is out of bounds. The array has 
<arraySize> elements. To return NULL instead, use `try_element_at`. If 
necessary set <config> to \"false\" to bypass this error." ]
   },
+  "INVALID_BUCKET_FILE" : {
+    "message" : [ "Invalid bucket file: <path>" ]
+  },
   "INVALID_FIELD_NAME" : {
     "message" : [ "Field name <fieldName> is invalid: <path> is not a struct." 
],
     "sqlState" : "42000"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index a155b0694b5..1e664100545 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2000,4 +2000,9 @@ object QueryExecutionErrors extends QueryErrorsBase {
         s"add ${toSQLValue(amount, IntegerType)} $unit to " +
         s"${toSQLValue(DateTimeUtils.microsToInstant(micros), 
TimestampType)}"))
   }
+
+  def invalidBucketFile(path: String): Throwable = {
+    new SparkException(errorClass = "INVALID_BUCKET_FILE", messageParameters = 
Array(path),
+      cause = null)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f7b627cef08..f5d349d975f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
 import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
@@ -618,8 +619,7 @@ case class FileSourceScanExec(
       }.groupBy { f =>
         BucketingUtils
           .getBucketId(new Path(f.filePath).getName)
-          // TODO(SPARK-39163): Throw an exception w/ error class for an 
invalid bucket file
-          .getOrElse(throw new IllegalStateException(s"Invalid bucket file 
${f.filePath}"))
+          .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
       }
 
     val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index bdc0772c1de..bbf6c0dda79 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.errors
 
-import java.io.IOException
-import java.net.URL
+import java.io.{File, IOException}
+import java.net.{URI, URL}
 import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, 
ResultSetMetaData}
 import java.util.{Locale, Properties, ServiceConfigurationError}
 
@@ -587,6 +587,30 @@ class QueryExecutionErrorsSuite
 
     JdbcDialects.unregisterDialect(testH2DialectUnrecognizedSQLType)
   }
+
+  test("INVALID_BUCKET_FILE: error if there exists any malformed bucket 
files") {
+    val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).
+      toDF("i", "j", "k").as("df1")
+
+    withTable("bucketed_table") {
+      df1.write.format("parquet").bucketBy(8, "i").
+        saveAsTable("bucketed_table")
+      val warehouseFilePath = new 
URI(spark.sessionState.conf.warehousePath).getPath
+      val tableDir = new File(warehouseFilePath, "bucketed_table")
+      Utils.deleteRecursively(tableDir)
+      df1.write.parquet(tableDir.getAbsolutePath)
+
+      val aggregated = spark.table("bucketed_table").groupBy("i").count()
+
+      checkErrorClass(
+        exception = intercept[SparkException] {
+          aggregated.count()
+        },
+        errorClass = "INVALID_BUCKET_FILE",
+        msg = "Invalid bucket file: .+",
+        matchMsg = true)
+    }
+  }
 }
 
 class FakeFileSystemSetPermission extends LocalFileSystem {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 0f71c028962..51d02f4a7c6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -860,10 +860,8 @@ class AdaptiveQueryExecSuite
         val error = intercept[SparkException] {
           aggregated.count()
         }
-        // TODO(SPARK-39163): Throw an exception w/ error class for an invalid 
bucket file
-        assert(error.getErrorClass === "INTERNAL_ERROR")
-        assert(error.getCause.toString contains "Invalid bucket file")
-        assert(error.getCause.getSuppressed.size === 0)
+        assert(error.getErrorClass === "INVALID_BUCKET_FILE")
+        assert(error.getMessage contains "Invalid bucket file")
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index c39edbc5860..fc7c4e5761b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -17,12 +17,8 @@
 
 package org.apache.spark.sql.sources
 
-import java.io.File
-import java.net.URI
-
 import scala.util.Random
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions
@@ -37,7 +33,6 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
-import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.BitSet
 
 class BucketedReadWithoutHiveSupportSuite
@@ -833,24 +828,6 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
     }
   }
 
-  test("error if there exists any malformed bucket files") {
-    withTable("bucketed_table") {
-      df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
-      val warehouseFilePath = new 
URI(spark.sessionState.conf.warehousePath).getPath
-      val tableDir = new File(warehouseFilePath, "bucketed_table")
-      Utils.deleteRecursively(tableDir)
-      df1.write.parquet(tableDir.getAbsolutePath)
-
-      val aggregated = spark.table("bucketed_table").groupBy("i").count()
-      val e = intercept[SparkException] {
-        aggregated.count()
-      }
-      // TODO(SPARK-39163): Throw an exception w/ error class for an invalid 
bucket file
-      assert(e.getErrorClass === "INTERNAL_ERROR")
-      assert(e.getCause.toString contains "Invalid bucket file")
-    }
-  }
-
   test("disable bucketing when the output doesn't contain all bucketing 
columns") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to