zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278375346


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that 
should be stubbed during
+          |the Scala UDF serde and execution if not found on the server 
classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)

Review Comment:
   The stub class loader currently would be used for all `withSession` calls in 
drivers, and all task runs in executors.
   Perhaps we should move the stubbing only used for UDF class loading in 
drivers + more aggressive default e.g. "org, com". 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1504,15 +1506,24 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
   }
 
   private def unpackUdf(fun: proto.CommonInlineUserDefinedFunction): UdfPacket 
= {
-    Utils.deserialize[UdfPacket](
-      fun.getScalarScalaUdf.getPayload.toByteArray,
-      Utils.getContextOrSparkClassLoader)
+    unpackScalarScalaUDF[UdfPacket](fun.getScalarScalaUdf)
   }
 
   private def unpackForeachWriter(fun: proto.ScalarScalaUDF): 
ForeachWriterPacket = {
-    Utils.deserialize[ForeachWriterPacket](
-      fun.getPayload.toByteArray,
-      Utils.getContextOrSparkClassLoader)
+    unpackScalarScalaUDF[ForeachWriterPacket](fun)
+  }
+
+  private def unpackScalarScalaUDF[T](fun: proto.ScalarScalaUDF): T = {
+    try {
+      logDebug(s"Unpack using class loader: 
${Utils.getContextOrSparkClassLoader}")
+      Utils.deserialize[T](fun.getPayload.toByteArray, 
Utils.getContextOrSparkClassLoader)
+    } catch {
+      case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] =>
+        throw new ClassNotFoundException(
+          s"Failed to load class correctly due to ${e.getCause}. " +
+            "Make sure the artifact where the class is defined is installed by 
calling" +
+            " session.addArtifact.")

Review Comment:
   > wouldn't this trigger also for a class that is not actually used, just 
accidentally pulled in, and not captured by the CONNECT_SCALA_UDF_STUB_CLASSES 
config
   
   This code you highlighted would not catch this class. Because your described 
case would fail with a `NoClassFoundException` rather than a 
`NoSuchMethodException`.



##########
core/src/main/scala/org/apache/spark/util/StubClassLoader.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.xbean.asm9.{ClassWriter, Opcodes}
+
+/**
+ * [[ClassLoader]] that replaces missing classes with stubs, if the cannot be 
found. It will only
+ * do this for classes that are marked for stubbing.
+ *
+ * While this is generally not a good idea. In this particular case this is 
used to load lambda's
+ * whose capturing class contains unknown (and unneeded) classes. The lambda 
itself does not need
+ * the class and therefor is safe to replace by a stub.
+ */
+class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
+  extends ClassLoader(parent) {
+  override def findClass(name: String): Class[_] = {
+    if (!shouldStub(name)) {
+      throw new ClassNotFoundException(name)
+    }
+    val bytes = StubClassLoader.generateStub(name)
+    defineClass(name, bytes, 0, bytes.length)
+  }
+}
+
+object StubClassLoader {
+  def apply(parent: ClassLoader, binaryName: Seq[String]): StubClassLoader = {
+    new StubClassLoader(parent, name => binaryName.exists(p => 
name.startsWith(p)))
+  }
+
+  def generateStub(binaryName: String): Array[Byte] = {

Review Comment:
   When user actually uses a class, it normally would be `val clazz = new 
Clazz(); clazz.callMethod`, when this happens, it fails earlier at compile to 
find the method before we come here (throw the error from constructor during 
runtime).
   
   Throwing an error from constructor would only help if the user calls `val 
clazz = new Class()`. And did not use the class afterwords.
   
   If you ask why not sub methods that the user would call and throw the error 
there? The reason is because it is too hard :) We need to scan the UDF 
contents. The `NoSuchMethodException` in `SparkConnectPlanner` is good enough 
to throw the error for us. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to