viirya commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r421155560



##########
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##########
@@ -414,6 +433,142 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
+private[spark] object IndylambdaScalaClosures extends Logging {
+  // internal name of java.lang.invoke.LambdaMetafactory
+  val LambdaMetafactoryClassName = "java/lang/invoke/LambdaMetafactory"
+  // the method that Scala indylambda use for bootstrap method
+  val LambdaMetafactoryMethodName = "altMetafactory"
+  val LambdaMetafactoryMethodDesc = "(Ljava/lang/invoke/MethodHandles$Lookup;" 
+
+    "Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)" +
+    "Ljava/lang/invoke/CallSite;"
+
+  /**
+   * Check if the given reference is a indylambda style Scala closure.
+   * If so, return a non-empty serialization proxy (SerializedLambda) of the 
closure;
+   * otherwise return None.
+   *
+   * @param maybeClosure the closure to check.
+   */
+  def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
+    val maybeClosureClass = maybeClosure.getClass
+
+    // shortcut the fast check:
+    // indylambda closure classes are generated by Java's LambdaMetafactory, 
and they're always
+    // synthetic.
+    if (!maybeClosureClass.isSynthetic) return None
+
+    val implementedInterfaces = 
ClassUtils.getAllInterfaces(maybeClosureClass).asScala
+    val isClosureCandidate = implementedInterfaces.exists(_.getName == 
"scala.Serializable") &&
+      implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
+
+    if (isClosureCandidate) {
+      try {
+        val lambdaProxy = inspect(maybeClosure)
+        if (isIndylambdaScalaClosure(lambdaProxy)) Option(lambdaProxy)
+        else None
+      } catch {
+        case e: Exception =>
+          // no need to check if debug is enabled here the Spark logging api 
covers this.
+          logDebug("The given reference is not an indylambda Scala closure.", 
e)
+          None
+      }
+    } else {
+      None
+    }
+  }
+
+  def isIndylambdaScalaClosure(lambdaProxy: SerializedLambda): Boolean = {
+    lambdaProxy.getImplMethodKind == MethodHandleInfo.REF_invokeStatic &&
+      lambdaProxy.getImplMethodName.contains("$anonfun$")
+      // && implements a scala.runtime.java8 functional interface
+  }
+
+  def inspect(closure: AnyRef): SerializedLambda = {
+    val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
+    writeReplace.setAccessible(true)
+    writeReplace.invoke(closure).asInstanceOf[SerializedLambda]
+  }
+
+  /**
+   * Scans an indylambda Scala closure, along with its lexically nested 
closures, and populate
+   * the accessed fields info on which fields on the outer object are accessed.
+   */
+  def findAccessedFields(
+      lambdaProxy: SerializedLambda,
+      lambdaClassLoader: ClassLoader,
+      accessedFields: Map[Class[_], Set[String]]): Unit = {
+    val implClassInternalName = lambdaProxy.getImplClass
+    // scalastyle:off classforname
+    val implClass = Class.forName(
+      implClassInternalName.replace('/', '.'), false, lambdaClassLoader)
+    // scalastyle:on classforname
+    val implClassNode = new ClassNode()
+    val implClassReader = ClosureCleaner.getClassReader(implClass)
+    implClassReader.accept(implClassNode, 0)
+
+    val methodsByName = Map.empty[MethodIdentifier[_], MethodNode]
+    for (m <- implClassNode.methods.asScala) {
+      methodsByName(MethodIdentifier(implClass, m.name, m.desc)) = m
+    }
+
+    val implMethodId = MethodIdentifier(
+      implClass, lambdaProxy.getImplMethodName, 
lambdaProxy.getImplMethodSignature)
+
+    val visited = Set[MethodIdentifier[_]](implMethodId)
+    val stack = Stack[MethodIdentifier[_]](implMethodId)
+    while (!stack.isEmpty) {
+      val currentId = stack.pop
+      val currentMethodNode = methodsByName(currentId)
+      logTrace(s"  scanning $currentId")
+      currentMethodNode.accept(new MethodVisitor(ASM7) {
+        override def visitFieldInsn(op: Int, owner: String, name: String, 
desc: String): Unit = {
+          if (op == GETFIELD || op == PUTFIELD) {
+            val ownerExternalName = owner.replace('/', '.')
+            for (cl <- accessedFields.keys if cl.getName == ownerExternalName) 
{
+              logTrace(s"    found field access $name on $owner")

Review comment:
       Will logging `ownerExternalName` be more readable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to