This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6165f316063 [SPARK-44056][SQL] Include UDF name in UDF execution 
failure error message when available
6165f316063 is described below

commit 6165f31606344efdf35f060d07cee46b85948e38
Author: Rob Reeves <roree...@linkedin.com>
AuthorDate: Wed Jun 21 18:00:36 2023 +0300

    [SPARK-44056][SQL] Include UDF name in UDF execution failure error message 
when available
    
    ### What changes were proposed in this pull request?
    This modifies the error message when a Scala UDF fails to execute by 
including the UDF name if it is available.
    
    ### Why are the changes needed?
    If there are multiple UDFs defined in the same location with the same 
method signature it can be hard to identify which UDF causes the issue. The 
current function class alone does not give enough information on its own. 
Adding the UDF name, if available, makes it easier to identify the exact 
problematic UDF.
    
    This is particularly helpful when the exception stack trace is not emitted 
due to a JVM performance optimization and codegen is enabled. Example in 3.1.1:
    ```
    Caused by: org.apache.spark.SparkException: Failed to execute user defined 
function(UDFRegistration$$Lambda$666/1969461119: (bigint, string) => string)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.subExpr_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
            at 
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:249)
            at 
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:248)
            at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:131)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:131)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:523)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1535)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:526)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it adds the UDF name to the UDF failure error message. Before this 
change:
    > [FAILED_EXECUTE_UDF] Failed to execute user defined function 
(QueryExecutionErrorsSuite$$Lambda$970/181260145: (string, int) => string).
    
    After this change:
    > [FAILED_EXECUTE_UDF] Failed to execute user defined function (nextChar in 
QueryExecutionErrorsSuite$$Lambda$970/181260145: (string, int) => string).
    
    ### How was this patch tested?
    Unit test added.
    
    Closes #41599 from robreeves/roreeves/roreeves/udf_error.
    
    Lead-authored-by: Rob Reeves <roree...@linkedin.com>
    Co-authored-by: Rob Reeves <robertpree...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/catalyst/expressions/ScalaUDF.scala  |  6 ++--
 .../spark/sql/errors/QueryExecutionErrors.scala    |  4 +--
 .../sql/errors/QueryExecutionErrorsSuite.scala     | 35 ++++++++++++++++++----
 .../spark/sql/hive/execution/HiveUDFSuite.scala    |  6 ++--
 4 files changed, 39 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 137a8976a40..40274a83340 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -1168,7 +1168,7 @@ case class ScalaUDF(
          |  $funcInvocation;
          |} catch (Throwable e) {
          |  throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
-         |    "$funcCls", "$inputTypesString", "$outputType", e);
+         |    "$functionName", "$inputTypesString", "$outputType", e);
          |}
        """.stripMargin
 
@@ -1188,6 +1188,8 @@ case class ScalaUDF(
 
   private[this] val resultConverter = catalystConverter
 
+  private def functionName = udfName.map { uName => s"$uName ($funcCls)" 
}.getOrElse(funcCls)
+
   lazy val funcCls = Utils.getSimpleName(function.getClass)
   lazy val inputTypesString = 
children.map(_.dataType.catalogString).mkString(", ")
   lazy val outputType = dataType.catalogString
@@ -1198,7 +1200,7 @@ case class ScalaUDF(
     } catch {
       case e: Exception =>
         throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
-          funcCls, inputTypesString, outputType, e)
+          functionName, inputTypesString, outputType, e)
     }
 
     resultConverter(result)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index beedc749e30..df2116df8f2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -207,12 +207,12 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
       messageParameters = Map("typeName" -> (dataType + failure)))
   }
 
-  def failedExecuteUserDefinedFunctionError(funcCls: String, inputTypes: 
String,
+  def failedExecuteUserDefinedFunctionError(functionName: String, inputTypes: 
String,
       outputType: String, e: Throwable): Throwable = {
     new SparkException(
       errorClass = "FAILED_EXECUTE_UDF",
       messageParameters = Map(
-        "functionName" -> funcCls,
+        "functionName" -> toSQLId(functionName),
         "signature" -> inputTypes,
         "result" -> outputType),
       cause = e)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 73a3e088894..61b3610e64e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -396,22 +396,45 @@ class QueryExecutionErrorsSuite
       sqlState = "22023")
   }
 
+  test("FAILED_EXECUTE_UDF: execute user defined function with registered 
UDF") {
+    val luckyCharOfWord = udf { (word: String, index: Int) => {
+      word.substring(index, index + 1)
+    }}
+    spark.udf.register("luckyCharOfWord", luckyCharOfWord)
+
+    val e = intercept[SparkException] {
+      Seq(("Jacek", 5), ("Agata", 5), ("Sweet", 6))
+        .toDF("word", "index")
+        .createOrReplaceTempView("words")
+      spark.sql("select luckyCharOfWord(word, index) from words").collect()
+    }
+    assert(e.getCause.isInstanceOf[SparkException])
+
+    checkError(
+      exception = e.getCause.asInstanceOf[SparkException],
+      errorClass = "FAILED_EXECUTE_UDF",
+      parameters = Map(
+        "functionName" ->
+          "`luckyCharOfWord 
\\(QueryExecutionErrorsSuite\\$\\$Lambda\\$\\d+/\\w+\\)`",
+        "signature" -> "string, int",
+        "result" -> "string"),
+      matchPVals = true)
+  }
+
   test("FAILED_EXECUTE_UDF: execute user defined function") {
     val luckyCharOfWord = udf { (word: String, index: Int) => {
       word.substring(index, index + 1)
     }}
-    val e1 = intercept[SparkException] {
+    val e = intercept[SparkException] {
       val words = Seq(("Jacek", 5), ("Agata", 5), ("Sweet", 6)).toDF("word", 
"index")
       words.select(luckyCharOfWord($"word", $"index")).collect()
     }
-    assert(e1.getCause.isInstanceOf[SparkException])
-
-    Utils.getSimpleName(luckyCharOfWord.getClass)
+    assert(e.getCause.isInstanceOf[SparkException])
 
     checkError(
-      exception = e1.getCause.asInstanceOf[SparkException],
+      exception = e.getCause.asInstanceOf[SparkException],
       errorClass = "FAILED_EXECUTE_UDF",
-      parameters = Map("functionName" -> 
"QueryExecutionErrorsSuite\\$\\$Lambda\\$\\d+/\\w+",
+      parameters = Map("functionName" -> 
"`QueryExecutionErrorsSuite\\$\\$Lambda\\$\\d+/\\w+`",
         "signature" -> "string, int",
         "result" -> "string"),
       matchPVals = true)
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 8fb9209f9cb..ef430f4b6a2 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -738,7 +738,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton 
with SQLTestUtils {
           e,
           "FAILED_EXECUTE_UDF",
           parameters = Map(
-            "functionName" -> s"${classOf[GenericUDFAssertTrue].getName}",
+            "functionName" ->
+              
"`org`.`apache`.`hadoop`.`hive`.`ql`.`udf`.`generic`.`GenericUDFAssertTrue`",
             "signature" -> "boolean",
             "result" -> "void"))
       }
@@ -768,7 +769,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton 
with SQLTestUtils {
           exception = 
intercept[SparkException](df.collect()).getCause.asInstanceOf[SparkException],
           errorClass = "FAILED_EXECUTE_UDF",
           parameters = Map(
-            "functionName" -> s"${classOf[SimpleUDFAssertTrue].getName}",
+            "functionName" ->
+              
"`org`.`apache`.`spark`.`sql`.`hive`.`execution`.`SimpleUDFAssertTrue`",
             "signature" -> "boolean",
             "result" -> "boolean"
           )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to