rednaxelafx opened a new pull request #28577:
URL: https://github.com/apache/spark/pull/28577


   This is a backport of https://github.com/apache/spark/pull/28463 from Apache 
Spark master/3.0 to 2.4.
   Minor adaptation include:
   - Retain the Spark 2.4-specific behavior of skipping the indylambda check 
when using Scala 2.11
   - Remove unnecessary LMF restrictions in ClosureCleaner tests
   - Address review comments in the original PR from @kiszk 
   
   Tested with the default Scala 2.11 build, and also tested 
ClosureCleaner-related tests in Scala 2.12 build as well:
   - repl: `SingletonReplSuite`
   - core: `ClosureCleanerSuite` and `ClosureCleanerSuite2`
   
   ---
   
   ### What changes were proposed in this pull request?
   
   This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" 
style of Scala closures to the same level as the existing implementation for 
the old (inner class) style ones. The goal is to reach feature parity with the 
support of the old style Scala closures, with as close to bug-for-bug 
compatibility as possible.
   
   Specifically, this PR addresses one lacking support for indylambda closures 
vs the inner class closures:
   - When a closure is declared in a Scala REPL and captures the enclosing REPL 
line object, such closure should be cleanable (unreferenced fields on the 
enclosing REPL line object should be cleaned)
   
   This PR maintains the same limitations in the new indylambda closure support 
as the old inner class closures, in particular the following two:
   - Cleaning is only available for one level of REPL line object. If a closure 
captures state from a REPL line object further out from the immediate enclosing 
one, it won't be subject to cleaning. See example below.
   - "Sibling" closures are not handled yet. A "sibling" closure is defined 
here as a closure that is directly or indirectly referenced by the starting 
closure, but isn't lexically enclosing. e.g.
     ```scala
     {
       val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, 
references `fieldA` on `this`.
       val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  
// captures `this` and `siblingClosure`, references `fieldB` on `this`.
     }
     ```
   
   The changes are intended to be minimal, with further code cleanups planned 
in separate PRs.
   
   Jargons:
   - old, inner class style Scala closures, aka `delambdafy:inline`: default in 
Scala 2.11 and before
   - new, "indylambda" style Scala closures, aka `delambdafy:method`: default 
in Scala 2.12 and later
   
   ### Why are the changes needed?
   
   There had been previous effortsto extend Spark's `ClosureCleaner` to support 
"indylambda" Scala closures, which is necessary for proper Scala 2.12 support. 
Most notably the work done for 
[SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540).
   
   But the previous efforts had missed one import scenario: a Scala closure 
declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line 
object. e.g. in a Spark Shell:
   ```scala
   :pa
   class NotSerializableClass(val x: Int)
   val ns = new NotSerializableClass(42)
   val topLevelValue = "someValue"
   val func = (j: Int) => {
     (1 to j).flatMap { x =>
       (1 to x).map { y => y + topLevelValue }
     }
   }
   <Ctrl+D>
   sc.parallelize(0 to 2).map(func).collect
   ```
   In this example, `func` refers to a Scala closure that captures the 
enclosing `this` because it needs to access `topLevelValue`, which is in turn 
implemented as a field on the enclosing REPL line object.
   
   The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 
2.11-, and this PR brings feature parity to Scala 2.12+.
   
   Note that the existing cleaning logic only supported one level of REPL line 
object nesting. This PR does not go beyond that. When a closure references 
state declared a few commands earlier, the cleaning will fail in both Scala 
2.11 and Scala 2.12. e.g.
   ```scala
   scala> :pa
   // Entering paste mode (ctrl-D to finish)
   
   class NotSerializableClass1(val x: Int)
   case class Foo(id: String)
   val ns = new NotSerializableClass1(42)
   val topLevelValue = "someValue"
   
   // Exiting paste mode, now interpreting.
   
   defined class NotSerializableClass1
   defined class Foo
   ns: NotSerializableClass1 = NotSerializableClass1@615b1baf
   topLevelValue: String = someValue
   
   scala> :pa
   // Entering paste mode (ctrl-D to finish)
   
   val closure2 = (j: Int) => {
     (1 to j).flatMap { x =>
       (1 to x).map { y => y + topLevelValue } // 2 levels
     }
   }
   
   // Exiting paste mode, now interpreting.
   
   closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>
   
   scala> sc.parallelize(0 to 2).map(closure2).collect
   org.apache.spark.SparkException: Task not serializable
   ...
   ```
   in the Scala 2.11 / Spark 2.4.x case:
   ```
   Caused by: java.io.NotSerializableException: NotSerializableClass1
   Serialization stack:
        - object not serializable (class: NotSerializableClass1, value: 
NotSerializableClass1@615b1baf)
        - field (class: $iw, name: ns, type: class NotSerializableClass1)
        - object (class $iw, $iw@64df3f4b)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@66e6e5e9)
        - field (class: $line14.$read, name: $iw, type: class $iw)
        - object (class $line14.$read, $line14.$read@c310aa3)
        - field (class: $iw, name: $line14$read, type: class $line14.$read)
        - object (class $iw, $iw@79224636)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@636d4cdc)
        - field (class: $anonfun$1, name: $outer, type: class $iw)
        - object (class $anonfun$1, <function1>)
   ```
   in the Scala 2.12 / Spark 2.4.x case after this PR:
   ```
   Caused by: java.io.NotSerializableException: NotSerializableClass1
   Serialization stack:
        - object not serializable (class: NotSerializableClass1, value: 
NotSerializableClass1@6f3b4c9a)
        - field (class: $iw, name: ns, type: class NotSerializableClass1)
        - object (class $iw, $iw@2945a3c1)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@152705d0)
        - field (class: $line14.$read, name: $iw, type: class $iw)
        - object (class $line14.$read, $line14.$read@7cf311eb)
        - field (class: $iw, name: $line14$read, type: class $line14.$read)
        - object (class $iw, $iw@d980dac)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@557d9532)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class $iw, 
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
$anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;,
 
instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;,
 numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class $Lambda$2103/815179920, $Lambda$2103/815179920@569b57c4)
   ```
   
   For more background of the new and old ways Scala lowers closures to Java 
bytecode, please see [A note on how NSC (New Scala Compiler) lowers 
lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md).
   
   For more background on how Spark's `ClosureCleaner` works and what's needed 
to make it support "indylambda" Scala closures, please refer to [A Note on 
Apache Spark's 
ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md).
   
   #### tl;dr
   
   The `ClosureCleaner` works like a mark-sweep algorithm on fields:
   - Finding (a chain of) outer objects referenced by the starting closure;
   - Scanning the starting closure and its inner closures and marking the 
fields on the outer objects accessed;
   - Cloning the outer objects, nulling out fields that are not accessed by any 
closure of concern.
   
   ##### Outer Objects
   
   For the old, inner class style Scala closures, the "outer objects" is 
defined as the lexically enclosing closures of the starting closure, plus an 
optional enclosing REPL line object if these closures are defined in a Scala 
REPL. All of them are on a singly-linked `$outer` chain.
   
   For the new, "indylambda" style Scala closures, the capturing implementation 
changed, so closures no longer refer to their enclosing closures via an 
`$outer` chain. However, a closure can still capture its enclosing REPL line 
object, much like the old style closures. The name of the field that captures 
this reference would be `arg$1` (instead of `$outer`).
   
   So what's missing in the `ClosureCleaner` for the "indylambda" support is 
find and potentially clone+clean the captured enclosing `this` REPL line 
object. That's what this PR implements.
   
   ##### Inner Closures
   
   The old, inner class style of Scala closures are compiled into separate 
inner classes, one per lambda body. So in order to discover the implementation 
(bytecode) of the inner closures, one has to jump over multiple classes. The 
name of such a class would contain the marker substring `$anonfun$`.
   
   The new, "indylambda" style Scala closures are compiled into **static 
methods** in the class where the lambdas were declared. So for lexically nested 
closures, their lambda bodies would all be compiled into static methods **in 
the same class**. This makes it much easier to discover the implementation 
(bytecode) of the nested lambda bodies. The name of such a static method would 
contain the marker substring `$anonfun$`.
   
   Discovery of inner closures involves scanning bytecode for certain patterns 
that represent the creation of a closure object for the inner closure.
   - For inner class style: the closure object creation site is like `new 
<InnerClassForTheClosure>(captured args)`
   - For "indylambda" style: the closure object creation site would be compiled 
into an `invokedynamic` instruction, with its "bootstrap method" pointing to 
the same one used by Java 8 for its serializable lambdas, and with the 
bootstrap method arguments pointing to the implementation method.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not 
support Scala closures declared in a Scala REPL that captures anything from the 
REPL line objects. After this PR, such scenario is supported.
   
   ### How was this patch tested?
   
   Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The 
new test case fails without the fix in this PR, and pases with the fix.
   
   Closes #28463 from rednaxelafx/closure-cleaner-indylambda.
   
   Authored-by: Kris Mok <[email protected]>
   Signed-off-by: Wenchen Fan <[email protected]>
   (cherry picked from commit dc01b7556f74e4a9873ceb1f78bc7df4e2ab4a8a)
   Signed-off-by: Kris Mok <[email protected]>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to