Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5685#discussion_r29391891
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
    @@ -101,21 +115,124 @@ private[spark] object ClosureCleaner extends Logging 
{
         }
       }
     
    -  def clean(func: AnyRef, checkSerializable: Boolean = true) {
    +  /**
    +   * Clean the given closure in place.
    +   *
    +   * More specifically, this renders the given closure serializable as 
long as it does not
    +   * explicitly reference unserializable objects.
    +   *
    +   * @param closure the closure to clean
    +   * @param checkSerializable whether to verify that the closure is 
serializable after cleaning
    +   * @param cleanTransitively whether to clean enclosing closures 
transitively
    +   */
    +  def clean(
    +      closure: AnyRef,
    +      checkSerializable: Boolean = true,
    +      cleanTransitively: Boolean = true): Unit = {
    +    clean(closure, checkSerializable, cleanTransitively, Map.empty)
    +  }
    +
    +  /**
    +   * Helper method to clean the given closure in place.
    +   *
    +   * The mechanism is to traverse the hierarchy of enclosing closures and 
null out any
    +   * references along the way that are not actually used by the starting 
closure, but are
    +   * nevertheless included in the compiled anonymous classes. Note that it 
is unsafe to
    +   * simply mutate the enclosing closures in place, as other code paths 
may depend on them.
    +   * Instead, we clone each enclosing closure and set the parent pointers 
accordingly.
    +   *
    +   * By default, closures are cleaned transitively. This means we detect 
whether enclosing
    +   * objects are actually referenced by the starting one, either directly 
or transitively,
    +   * and, if not, sever these closures from the hierarchy. In other words, 
in addition to
    +   * nulling out unused field references, we also null out any parent 
pointers that refer
    +   * to enclosing objects not actually needed by the starting closure. We 
determine
    +   * transitivity by tracing through the tree of all methods ultimately 
invoked by the
    +   * inner closure and record all the fields referenced in the process.
    +   *
    +   * For instance, transitive cleaning is necessary in the following 
scenario:
    +   *
    +   *   class SomethingNotSerializable {
    +   *     def someValue = 1
    +   *     def someMethod(): Unit = scope("one") {
    +   *       def x = someValue
    +   *       def y = 2
    +   *       scope("two") { println(y + 1) }
    +   *     }
    +   *     def scope(name: String)(body: => Unit) = body
    +   *   }
    +   *
    +   * In this example, scope "two" is not serializable because it 
references scope "one", which
    +   * references SomethingNotSerializable. Note that, however, the body of 
scope "two" does not
    +   * actually depend on SomethingNotSerializable. This means we can safely 
null out the parent
    +   * pointer of a cloned scope "one" and set it the parent of scope "two", 
such that scope "two"
    +   * no longer references SomethingNotSerializable transitively.
    +   *
    +   * @param func the starting closure to clean
    +   * @param checkSerializable whether to verify that the closure is 
serializable after cleaning
    +   * @param cleanTransitively whether to clean enclosing closures 
transitively
    +   * @param accessedFields a map from a class to a set of its fields that 
are accessed by
    +   *                       the starting closure
    +   */
    +  private def clean(
    +      func: AnyRef,
    +      checkSerializable: Boolean,
    +      cleanTransitively: Boolean,
    +      accessedFields: Map[Class[_], Set[String]]): Unit = {
    +
    +    // TODO: clean all inner closures first. This requires us to find the 
inner objects.
         // TODO: cache outerClasses / innerClasses / accessedFields
    -    val outerClasses = getOuterClasses(func)
    +
    +    if (func == null) {
    +      return
    +    }
    +
    +    logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}}) +++")
    +
    +    // A list of classes that represents closures enclosed in the given one
         val innerClasses = getInnerClasses(func)
    --- End diff --
    
    should this be called `innerClosureClasses` and `getInnerClosureClasses`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to