rednaxelafx commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r420784636



##########
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##########
@@ -414,6 +433,142 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
+private[spark] object IndylambdaScalaClosures extends Logging {
+  // internal name of java.lang.invoke.LambdaMetafactory
+  val LambdaMetafactoryClassName = "java/lang/invoke/LambdaMetafactory"
+  // the method that Scala indylambda use for bootstrap method
+  val LambdaMetafactoryMethodName = "altMetafactory"
+  val LambdaMetafactoryMethodDesc = "(Ljava/lang/invoke/MethodHandles$Lookup;" 
+
+    "Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)" +
+    "Ljava/lang/invoke/CallSite;"
+
+  /**
+   * Check if the given reference is a indylambda style Scala closure.
+   * If so, return a non-empty serialization proxy (SerializedLambda) of the 
closure;
+   * otherwise return None.
+   *
+   * @param maybeClosure the closure to check.
+   */
+  def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
+    val maybeClosureClass = maybeClosure.getClass
+
+    // shortcut the fast check:
+    // indylambda closure classes are generated by Java's LambdaMetafactory, 
and they're always
+    // synthetic.
+    if (!maybeClosureClass.isSynthetic) return None
+
+    val implementedInterfaces = 
ClassUtils.getAllInterfaces(maybeClosureClass).asScala
+    val isClosureCandidate = implementedInterfaces.exists(_.getName == 
"scala.Serializable") &&
+      implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
+
+    if (isClosureCandidate) {
+      try {
+        val lambdaProxy = inspect(maybeClosure)
+        if (isIndylambdaScalaClosure(lambdaProxy)) Option(lambdaProxy)
+        else None
+      } catch {
+        case e: Exception =>
+          // no need to check if debug is enabled here the Spark logging api 
covers this.
+          logDebug("The given reference is not an indylambda Scala closure.", 
e)
+          None
+      }
+    } else {
+      None
+    }
+  }
+
+  def isIndylambdaScalaClosure(lambdaProxy: SerializedLambda): Boolean = {
+    lambdaProxy.getImplMethodKind == MethodHandleInfo.REF_invokeStatic &&
+      lambdaProxy.getImplMethodName.contains("$anonfun$")
+      // && implements a scala.runtime.java8 functional interface
+  }
+
+  def inspect(closure: AnyRef): SerializedLambda = {
+    val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")

Review comment:
       Not a copy, but a "serialization proxy" object that holds on to the 
symbolic information of the original closure object.
   
   The original closure object contains too much runtime-specific information 
(e.g. class loader info, reference to runtime-generated lambda class) that's 
not suitable for serialization. So the JDK uses a "serialization proxy" that 
only holds on to the symbolic info for serialization purposes.
   
   Spark's `ClosureCleaner` abuses this serialization proxy for introspection 
into what's inside the black box.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to