This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f47ac43e55 [SPARK-42327][SQL] Assign name to `_LEGACY_ERROR_TEMP_2177`
6f47ac43e55 is described below

commit 6f47ac43e55c332f63876cf4f8ecf1b41b277651
Author: itholic <haejoon....@databricks.com>
AuthorDate: Mon Feb 13 21:33:53 2023 +0500

    [SPARK-42327][SQL] Assign name to `_LEGACY_ERROR_TEMP_2177`
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to assign the name `MALFORMED_RECORD_IN_PARSING` to 
`_LEGACY_ERROR_TEMP_2177` and improve the error message.
    
    ### Why are the changes needed?
    
    We should assign proper name to LEGACY errors, and show actionable error 
messages.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, but error message improvements.
    
    ### How was this patch tested?
    
    Updated UTs.
    
    Closes #39980 from itholic/LEGACY_2177.
    
    Authored-by: itholic <haejoon....@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   | 11 ++--
 .../sql/catalyst/util/FailureSafeParser.scala      |  3 +-
 .../spark/sql/errors/QueryExecutionErrors.scala    |  6 +-
 .../expressions/JsonExpressionsSuite.scala         |  8 ++-
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   |  9 ++-
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  | 66 ++++++++++++++++------
 .../sql/execution/datasources/csv/CSVSuite.scala   |  6 +-
 .../sql/execution/datasources/json/JsonSuite.scala | 22 ++++++--
 .../spark/sql/hive/thriftserver/CliSuite.scala     |  4 +-
 .../ThriftServerWithSparkContextSuite.scala        |  4 +-
 10 files changed, 95 insertions(+), 44 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index e96383399d2..e329932acf1 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1063,6 +1063,12 @@
       "Malformed Protobuf messages are detected in message deserialization. 
Parse Mode: <failFastMode>. To process malformed protobuf message as null 
result, try setting the option 'mode' as 'PERMISSIVE'."
     ]
   },
+  "MALFORMED_RECORD_IN_PARSING" : {
+    "message" : [
+      "Malformed records are detected in record parsing: <badRecord>.",
+      "Parse Mode: <failFastMode>. To process malformed records as null 
result, try setting the option 'mode' as 'PERMISSIVE'."
+    ]
+  },
   "MISSING_AGGREGATION" : {
     "message" : [
       "The non-aggregating expression <expression> is based on columns which 
are not participating in the GROUP BY clause.",
@@ -4414,11 +4420,6 @@
       "Cannot create array with <numElements> elements of data due to 
exceeding the limit <maxRoundedArrayLength> elements for ArrayData. 
<additionalErrorMessage>"
     ]
   },
-  "_LEGACY_ERROR_TEMP_2177" : {
-    "message" : [
-      "Malformed records are detected in record parsing. Parse Mode: 
<failFastMode>. To process malformed records as null result, try setting the 
option 'mode' as 'PERMISSIVE'."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2178" : {
     "message" : [
       "Remote operations not supported."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index 5a9e52a51a2..fcdcd21b6dc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -65,7 +65,8 @@ class FailureSafeParser[IN](
         case DropMalformedMode =>
           Iterator.empty
         case FailFastMode =>
-          throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(e)
+          throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
+            toResultRow(e.partialResult(), e.record).toString, e)
       }
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 4134da135e3..fd3809ccd31 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1743,10 +1743,12 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
         "additionalErrorMessage" -> additionalErrorMessage))
   }
 
-  def malformedRecordsDetectedInRecordParsingError(e: BadRecordException): 
Throwable = {
+  def malformedRecordsDetectedInRecordParsingError(
+      badRecord: String, e: BadRecordException): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2177",
+      errorClass = "MALFORMED_RECORD_IN_PARSING",
       messageParameters = Map(
+        "badRecord" -> badRecord,
         "failFastMode" -> FailFastMode.name),
       cause = e)
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index beeb01619aa..a1db7e4c3ab 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -446,9 +446,11 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
         InternalRow(null)
       )
     }.getCause
-    assert(exception.isInstanceOf[SparkException])
-    assert(exception.getMessage.contains(
-      "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST"))
+    checkError(
+      exception = exception.asInstanceOf[SparkException],
+      errorClass = "MALFORMED_RECORD_IN_PARSING",
+      parameters = Map("badRecord" -> "[null]", "failFastMode" -> "FAILFAST")
+    )
   }
 
   test("from_json - input=array, schema=array, output=array") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index fcdc40404e7..67ba5511263 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -304,9 +304,12 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
 
       val exception1 = intercept[SparkException] {
         df.select(from_csv($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
-      }.getMessage
-      assert(exception1.contains(
-        "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+      }.getCause
+      checkError(
+        exception = exception1.asInstanceOf[SparkException],
+        errorClass = "MALFORMED_RECORD_IN_PARSING",
+        parameters = Map("badRecord" -> "[null,null,\"]", "failFastMode" -> 
"FAILFAST")
+      )
 
       val exception2 = intercept[SparkException] {
         df.select(from_csv($"value", schema, Map("mode" -> "DROPMALFORMED")))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 57c54e88229..f2e0fd57738 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -749,9 +749,14 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
 
       val exception1 = intercept[SparkException] {
         df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
-      }.getMessage
-      assert(exception1.contains(
-        "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+      }.getCause
+      checkError(
+        exception = exception1.asInstanceOf[SparkException],
+        errorClass = "MALFORMED_RECORD_IN_PARSING",
+        parameters = Map(
+          "badRecord" -> "[null,null,{\"a\" 1, \"b\": 11}]",
+          "failFastMode" -> "FAILFAST")
+      )
 
       val exception2 = intercept[SparkException] {
         df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED")))
@@ -778,10 +783,15 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
 
       val exception = intercept[SparkException] {
         df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
-      }
+      }.getCause
 
-      assert(exception.getMessage.contains(
-        "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+      checkError(
+        exception = exception.asInstanceOf[SparkException],
+        errorClass = "MALFORMED_RECORD_IN_PARSING",
+        parameters = Map(
+          "badRecord" -> "[null,11,{\"a\": \"1\", \"b\": 11}]",
+          "failFastMode" -> "FAILFAST")
+      )
       checkError(
         exception = 
ExceptionUtils.getRootCause(exception).asInstanceOf[SparkRuntimeException],
         errorClass = "CANNOT_PARSE_JSON_FIELD",
@@ -1107,15 +1117,25 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
 
         val exception1 = intercept[SparkException] {
           df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))("b")).collect()
-        }.getMessage
-        assert(exception1.contains(
-          "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+        }.getCause
+        checkError(
+          exception = exception1.asInstanceOf[SparkException],
+          errorClass = "MALFORMED_RECORD_IN_PARSING",
+          parameters = Map(
+            "badRecord" -> "[null,null]",
+            "failFastMode" -> "FAILFAST")
+        )
 
         val exception2 = intercept[SparkException] {
           df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))("a")).collect()
-        }.getMessage
-        assert(exception2.contains(
-          "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+        }.getCause
+        checkError(
+          exception = exception2.asInstanceOf[SparkException],
+          errorClass = "MALFORMED_RECORD_IN_PARSING",
+          parameters = Map(
+            "badRecord" -> "[null,null]",
+            "failFastMode" -> "FAILFAST")
+        )
       }
     }
   }
@@ -1131,15 +1151,25 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
 
         val exception1 = intercept[SparkException] {
           df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))("b")).collect()
-        }.getMessage
-        assert(exception1.contains(
-          "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+        }.getCause
+        checkError(
+          exception = exception1.asInstanceOf[SparkException],
+          errorClass = "MALFORMED_RECORD_IN_PARSING",
+          parameters = Map(
+            "badRecord" -> "[null]",
+            "failFastMode" -> "FAILFAST")
+        )
 
         val exception2 = intercept[SparkException] {
           df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))("a")).collect()
-        }.getMessage
-        assert(exception2.contains(
-          "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+        }.getCause
+        checkError(
+          exception = exception2.asInstanceOf[SparkException],
+          errorClass = "MALFORMED_RECORD_IN_PARSING",
+          parameters = Map(
+            "badRecord" -> "[null]",
+            "failFastMode" -> "FAILFAST")
+        )
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 44f1b2faceb..4cc971a05af 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -3218,8 +3218,10 @@ class CSVv1Suite extends CSVSuite {
 
       checkError(
         exception = exception.getCause.asInstanceOf[SparkException],
-        errorClass = "_LEGACY_ERROR_TEMP_2177",
-        parameters = Map("failFastMode" -> "FAILFAST")
+        errorClass = "MALFORMED_RECORD_IN_PARSING",
+        parameters = Map(
+          "badRecord" -> "[2015,Chevy,Volt,null,null]",
+          "failFastMode" -> "FAILFAST")
       )
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index efc062e927c..5595a9670ac 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1073,9 +1073,14 @@ abstract class JsonSuite
         .schema("a string")
         .json(corruptRecords)
         .collect()
-    }.getMessage
-    assert(exceptionTwo.contains(
-      "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+    }.getCause
+    checkError(
+      exception = exceptionTwo.asInstanceOf[SparkException],
+      errorClass = "MALFORMED_RECORD_IN_PARSING",
+      parameters = Map(
+        "badRecord" -> "[null]",
+        "failFastMode" -> "FAILFAST")
+    )
   }
 
   test("Corrupt records: DROPMALFORMED mode") {
@@ -1989,9 +1994,14 @@ abstract class JsonSuite
           .schema(schema)
           .json(path)
           .collect()
-      }
-      assert(exceptionTwo.getMessage.contains("Malformed records are detected 
in record " +
-        "parsing. Parse Mode: FAILFAST."))
+      }.getCause
+      checkError(
+        exception = exceptionTwo.asInstanceOf[SparkException],
+        errorClass = "MALFORMED_RECORD_IN_PARSING",
+        parameters = Map(
+          "badRecord" -> "[null]",
+          "failFastMode" -> "FAILFAST")
+      )
     }
   }
 
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 3ab6dcdd995..f73b1b8e68a 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -560,7 +560,7 @@ class CliSuite extends SparkFunSuite {
       extraArgs = Seq("--hiveconf", "hive.session.silent=false",
         "-e", "select from_json('a', 'a INT', map('mode', 'FAILFAST'));"),
       errorResponses = Seq("JsonParseException"))(
-      ("", "SparkException: Malformed records are detected in record parsing"),
+      ("", "SparkException: [MALFORMED_RECORD_IN_PARSING]"),
       ("", "JsonParseException: Unrecognized token 'a'"))
     // If it is in silent mode, will print the error message only
     runCliWithin(
@@ -568,7 +568,7 @@ class CliSuite extends SparkFunSuite {
       extraArgs = Seq("--conf", "spark.hive.session.silent=true",
         "-e", "select from_json('a', 'a INT', map('mode', 'FAILFAST'));"),
       errorResponses = Seq("SparkException"))(
-      ("", "SparkException: Malformed records are detected in record parsing"))
+      ("", "SparkException: [MALFORMED_RECORD_IN_PARSING]"))
   }
 
   test("SPARK-30808: use Java 8 time API in Thrift SQL CLI by default") {
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index 0228f0ac6d2..0c8a1d1260e 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -69,7 +69,7 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
       }
       assert(e.getMessage.contains("JsonParseException: Unrecognized token 
'a'"))
       assert(!e.getMessage.contains(
-        "SparkException: Malformed records are detected in record parsing"))
+        "SparkException: [MALFORMED_RECORD_IN_PARSING]"))
     }
 
     withJdbcStatement { statement =>
@@ -78,7 +78,7 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
       }
       assert(e.getMessage.contains("JsonParseException: Unrecognized token 
'a'"))
       assert(e.getMessage.contains(
-        "SparkException: Malformed records are detected in record parsing"))
+        "SparkException: [MALFORMED_RECORD_IN_PARSING]"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to