zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278375346
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
.version("3.5.0")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+ ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+ .internal()
+ .doc("""
+ |Comma-separated list of binary names of classes/packages that
should be stubbed during
+ |the Scala UDF serde and execution if not found on the server
classpath.
+ |An empty list effectively disables stubbing for all missing classes.
+ |By default, the server stubs classes from the Scala client package.
+ |""".stripMargin)
Review Comment:
The stub class loader currently would be used for all `withSession` calls in
drivers, and all task runs in executors.
Perhaps we should move the stubbing only used for UDF class loading in
drivers + more aggressive default e.g. "org, com".
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1504,15 +1506,24 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
}
private def unpackUdf(fun: proto.CommonInlineUserDefinedFunction): UdfPacket
= {
- Utils.deserialize[UdfPacket](
- fun.getScalarScalaUdf.getPayload.toByteArray,
- Utils.getContextOrSparkClassLoader)
+ unpackScalarScalaUDF[UdfPacket](fun.getScalarScalaUdf)
}
private def unpackForeachWriter(fun: proto.ScalarScalaUDF):
ForeachWriterPacket = {
- Utils.deserialize[ForeachWriterPacket](
- fun.getPayload.toByteArray,
- Utils.getContextOrSparkClassLoader)
+ unpackScalarScalaUDF[ForeachWriterPacket](fun)
+ }
+
+ private def unpackScalarScalaUDF[T](fun: proto.ScalarScalaUDF): T = {
+ try {
+ logDebug(s"Unpack using class loader:
${Utils.getContextOrSparkClassLoader}")
+ Utils.deserialize[T](fun.getPayload.toByteArray,
Utils.getContextOrSparkClassLoader)
+ } catch {
+ case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] =>
+ throw new ClassNotFoundException(
+ s"Failed to load class correctly due to ${e.getCause}. " +
+ "Make sure the artifact where the class is defined is installed by
calling" +
+ " session.addArtifact.")
Review Comment:
> wouldn't this trigger also for a class that is not actually used, just
accidentally pulled in, and not captured by the CONNECT_SCALA_UDF_STUB_CLASSES
config
This code you highlighted would not catch this class. Because your described
case would fail with a `NoClassFoundException` rather than a
`NoSuchMethodException`.
##########
core/src/main/scala/org/apache/spark/util/StubClassLoader.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.xbean.asm9.{ClassWriter, Opcodes}
+
+/**
+ * [[ClassLoader]] that replaces missing classes with stubs, if the cannot be
found. It will only
+ * do this for classes that are marked for stubbing.
+ *
+ * While this is generally not a good idea. In this particular case this is
used to load lambda's
+ * whose capturing class contains unknown (and unneeded) classes. The lambda
itself does not need
+ * the class and therefor is safe to replace by a stub.
+ */
+class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
+ extends ClassLoader(parent) {
+ override def findClass(name: String): Class[_] = {
+ if (!shouldStub(name)) {
+ throw new ClassNotFoundException(name)
+ }
+ val bytes = StubClassLoader.generateStub(name)
+ defineClass(name, bytes, 0, bytes.length)
+ }
+}
+
+object StubClassLoader {
+ def apply(parent: ClassLoader, binaryName: Seq[String]): StubClassLoader = {
+ new StubClassLoader(parent, name => binaryName.exists(p =>
name.startsWith(p)))
+ }
+
+ def generateStub(binaryName: String): Array[Byte] = {
Review Comment:
When user actually uses a class, it normally would be `val clazz = new
Clazz(); clazz.callMethod`, when this happens, it fails earlier at compile to
find the method before we come here (throw the error from constructor during
runtime).
Throwing an error from constructor would only help if the user calls `val
clazz = new Class()`. And did not use the class afterwords.
If you ask why not sub methods that the user would call and throw the error
there? The reason is because it is too hard :) We need to scan the UDF
contents. The `NoSuchMethodException` in `SparkConnectPlanner` is good enough
to throw the error for us.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]