This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new cd4ae6e7452 [SPARK-44689][CONNECT] Make the exception handling of 
function `SparkConnectPlanner#unpackScalarScalaUDF` more universal
cd4ae6e7452 is described below

commit cd4ae6e7452170422bdf14ab5e1957e61503904f
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Tue Aug 8 11:06:21 2023 +0800

    [SPARK-44689][CONNECT] Make the exception handling of function 
`SparkConnectPlanner#unpackScalarScalaUDF` more universal
    
    ### What changes were proposed in this pull request?
    This PR changes the exception handling in the `unpackScalarScalaUD` 
function in `SparkConnectPlanner` from determining the exception type based on 
a fixed nesting level to using Guava `Throwables` to get the root cause and 
then determining the type of the root cause. This makes it compatible with 
differences between different Java versions.
    
    ### Why are the changes needed?
    The following failure occurred when testing `UDFClassLoadingE2ESuite` in 
Java 17 daily test:
    
    https://github.com/apache/spark/actions/runs/5766913899/job/15635782831
    
    ```
    [info] UDFClassLoadingE2ESuite:
    [info] - update class loader after stubbing: new session *** FAILED *** 
(101 milliseconds)
    [info]   "Exception in SerializedLambda.readResolve" did not contain 
"java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf" 
(UDFClassLoadingE2ESuite.scala:57)
    ...
    [info] - update class loader after stubbing: same session *** FAILED *** 
(52 milliseconds)
    [info]   "Exception in SerializedLambda.readResolve" did not contain 
"java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf" 
(UDFClassLoadingE2ESuite.scala:73)
    ...
    ```
    
    After analysis, it was found that there are differences in the exception 
stack generated on the server side between Java 8 and Java 17:
    
    - Java 8
    
    ```
    java.io.IOException: unexpected exception type
      at 
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
      at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
      at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2222)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
      at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
      at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
      at 
org.apache.spark.util.SparkSerDeUtils.deserialize(SparkSerDeUtils.scala:50)
      at 
org.apache.spark.util.SparkSerDeUtils.deserialize$(SparkSerDeUtils.scala:41)
      at org.apache.spark.util.Utils$.deserialize(Utils.scala:95)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.unpackScalarScalaUDF(SparkConnectPlanner.scala:1516)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1507)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformScalarScalaFunction(SparkConnectPlanner.scala:1544)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterScalarScalaUDF(SparkConnectPlanner.scala:2565)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterUserDefinedFunction(SparkConnectPlanner.scala:2492)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2363)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
      at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:184)
      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
      at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:184)
      at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
      at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:171)
      at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179)
      at 
org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:170)
      at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:183)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:227)
    Caused by: java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf.$deserializeLambda$(java.lang.invoke.SerializedLambda)
      at java.lang.Class.getDeclaredMethod(Class.java:2130)
      at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:224)
      at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:221)
      at java.security.AccessController.doPrivileged(Native Method)
      at 
java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:221)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
      ... 31 more
    ```
    
    - Java 17
    
    ```
    java.lang.RuntimeException: Exception in SerializedLambda.readResolve
      at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:288)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
      at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
      at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base/java.lang.reflect.Method.invoke(Method.java:568)
      at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1190)
      at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2266)
      at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
      at 
java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
      at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
      at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
      at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
      at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
      at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
      at 
org.apache.spark.util.SparkSerDeUtils.deserialize(SparkSerDeUtils.scala:50)
      at 
org.apache.spark.util.SparkSerDeUtils.deserialize$(SparkSerDeUtils.scala:41)
      at org.apache.spark.util.Utils$.deserialize(Utils.scala:95)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.unpackScalarScalaUDF(SparkConnectPlanner.scala:1517)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1507)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformScalarScalaFunction(SparkConnectPlanner.scala:1552)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterScalarScalaUDF(SparkConnectPlanner.scala:2573)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterUserDefinedFunction(SparkConnectPlanner.scala:2500)
      at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2371)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
      at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:184)
      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
      at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:184)
      at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
      at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:171)
      at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179)
      at 
org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:170)
      at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:183)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84)
      at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:227)
    Caused by: java.security.PrivilegedActionException: 
java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf.$deserializeLambda$(java.lang.invoke.SerializedLambda)
      at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:573)
      at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:269)
      ... 36 more
    Caused by: java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf.$deserializeLambda$(java.lang.invoke.SerializedLambda)
      at java.base/java.lang.Class.getDeclaredMethod(Class.java:2675)
      at 
java.base/java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:272)
      at 
java.base/java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:269)
      at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:569)
      ... 37 more
    ```
    
    While their root exceptions are both `NoSuchMethodException`, the levels of 
nesting are different.
    
    We can add an exception check branch to make it compatible with Java 17, 
for example:
    
    ```scala
    case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] =>
      throw new ClassNotFoundException(... ${e.getCause} ...)
    case e: RuntimeException
        if e.getCause != null && 
e.getCause.getCause.isInstanceOf[NoSuchMethodException] =>
      throw new ClassNotFoundException(... ${e.getCause.getCause} ...)
    ```
    
    But if future Java versions change the nested levels of exceptions again, 
this will necessitate another modification of this part of the code. Therefore, 
this PR has been revised to fetch the root cause of the exception and conduct a 
type check on the root cause to make it as universal as possible.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Pass Git Hub Actions
    - Manually check with Java 17
    
    ```
    java -version
    openjdk version "17.0.8" 2023-07-18 LTS
    OpenJDK Runtime Environment Zulu17.44+15-CA (build 17.0.8+7-LTS)
    OpenJDK 64-Bit Server VM Zulu17.44+15-CA (build 17.0.8+7-LTS, mixed mode, 
sharing)
    ```
    
    run
    
    ```
    build/sbt clean "connect-client-jvm/testOnly *UDFClassLoadingE2ESuite" 
-Phive
    ```
    
    Before
    
    ```
    [info] UDFClassLoadingE2ESuite:
    [info] - update class loader after stubbing: new session *** FAILED *** (60 
milliseconds)
    [info]   "Exception in SerializedLambda.readResolve" did not contain 
"java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf" 
(UDFClassLoadingE2ESuite.scala:57)
    ...
    [info] - update class loader after stubbing: same session *** FAILED *** 
(15 milliseconds)
    [info]   "Exception in SerializedLambda.readResolve" did not contain 
"java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf" 
(UDFClassLoadingE2ESuite.scala:73)
    ...
    [info] Run completed in 9 seconds, 565 milliseconds.
    [info] Total number of tests run: 2
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 0, failed 2, canceled 0, ignored 0, pending 0
    [info] *** 2 TESTS FAILED ***
    [error] Failed tests:
    [error]   org.apache.spark.sql.connect.client.UDFClassLoadingE2ESuite
    [error] (connect-client-jvm / Test / testOnly) sbt.TestsFailedException: 
Tests unsuccessful
    ```
    
    After
    
    ```
    [info] UDFClassLoadingE2ESuite:
    [info] - update class loader after stubbing: new session (116 milliseconds)
    [info] - update class loader after stubbing: same session (41 milliseconds)
    [info] Run completed in 9 seconds, 781 milliseconds.
    [info] Total number of tests run: 2
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #42360 from LuciferYang/unpackScalarScalaUDF-exception-java17.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
    (cherry picked from commit 25053d98186489d9f2061c9b815a5a33f7e309c4)
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../spark/sql/connect/planner/SparkConnectPlanner.scala | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index bfdc6f6271c..d59d01b4ce3 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.sql.connect.planner
 
-import java.io.IOException
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Try
 
+import com.google.common.base.Throwables
 import com.google.common.collect.{Lists, Maps}
 import com.google.protobuf.{Any => ProtoAny, ByteString}
 import io.grpc.{Context, Status, StatusRuntimeException}
@@ -1518,11 +1517,15 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
       logDebug(s"Unpack using class loader: 
${Utils.getContextOrSparkClassLoader}")
       Utils.deserialize[T](fun.getPayload.toByteArray, 
Utils.getContextOrSparkClassLoader)
     } catch {
-      case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] =>
-        throw new ClassNotFoundException(
-          s"Failed to load class correctly due to ${e.getCause}. " +
-            "Make sure the artifact where the class is defined is installed by 
calling" +
-            " session.addArtifact.")
+      case t: Throwable =>
+        Throwables.getRootCause(t) match {
+          case nsm: NoSuchMethodException =>
+            throw new ClassNotFoundException(
+              s"Failed to load class correctly due to $nsm. " +
+                "Make sure the artifact where the class is defined is 
installed by calling" +
+                " session.addArtifact.")
+          case _ => throw t
+        }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to