Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/5685#discussion_r29391891
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -101,21 +115,124 @@ private[spark] object ClosureCleaner extends Logging
{
}
}
- def clean(func: AnyRef, checkSerializable: Boolean = true) {
+ /**
+ * Clean the given closure in place.
+ *
+ * More specifically, this renders the given closure serializable as
long as it does not
+ * explicitly reference unserializable objects.
+ *
+ * @param closure the closure to clean
+ * @param checkSerializable whether to verify that the closure is
serializable after cleaning
+ * @param cleanTransitively whether to clean enclosing closures
transitively
+ */
+ def clean(
+ closure: AnyRef,
+ checkSerializable: Boolean = true,
+ cleanTransitively: Boolean = true): Unit = {
+ clean(closure, checkSerializable, cleanTransitively, Map.empty)
+ }
+
+ /**
+ * Helper method to clean the given closure in place.
+ *
+ * The mechanism is to traverse the hierarchy of enclosing closures and
null out any
+ * references along the way that are not actually used by the starting
closure, but are
+ * nevertheless included in the compiled anonymous classes. Note that it
is unsafe to
+ * simply mutate the enclosing closures in place, as other code paths
may depend on them.
+ * Instead, we clone each enclosing closure and set the parent pointers
accordingly.
+ *
+ * By default, closures are cleaned transitively. This means we detect
whether enclosing
+ * objects are actually referenced by the starting one, either directly
or transitively,
+ * and, if not, sever these closures from the hierarchy. In other words,
in addition to
+ * nulling out unused field references, we also null out any parent
pointers that refer
+ * to enclosing objects not actually needed by the starting closure. We
determine
+ * transitivity by tracing through the tree of all methods ultimately
invoked by the
+ * inner closure and record all the fields referenced in the process.
+ *
+ * For instance, transitive cleaning is necessary in the following
scenario:
+ *
+ * class SomethingNotSerializable {
+ * def someValue = 1
+ * def someMethod(): Unit = scope("one") {
+ * def x = someValue
+ * def y = 2
+ * scope("two") { println(y + 1) }
+ * }
+ * def scope(name: String)(body: => Unit) = body
+ * }
+ *
+ * In this example, scope "two" is not serializable because it
references scope "one", which
+ * references SomethingNotSerializable. Note that, however, the body of
scope "two" does not
+ * actually depend on SomethingNotSerializable. This means we can safely
null out the parent
+ * pointer of a cloned scope "one" and set it the parent of scope "two",
such that scope "two"
+ * no longer references SomethingNotSerializable transitively.
+ *
+ * @param func the starting closure to clean
+ * @param checkSerializable whether to verify that the closure is
serializable after cleaning
+ * @param cleanTransitively whether to clean enclosing closures
transitively
+ * @param accessedFields a map from a class to a set of its fields that
are accessed by
+ * the starting closure
+ */
+ private def clean(
+ func: AnyRef,
+ checkSerializable: Boolean,
+ cleanTransitively: Boolean,
+ accessedFields: Map[Class[_], Set[String]]): Unit = {
+
+ // TODO: clean all inner closures first. This requires us to find the
inner objects.
// TODO: cache outerClasses / innerClasses / accessedFields
- val outerClasses = getOuterClasses(func)
+
+ if (func == null) {
+ return
+ }
+
+ logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}}) +++")
+
+ // A list of classes that represents closures enclosed in the given one
val innerClasses = getInnerClasses(func)
--- End diff --
should this be called `innerClosureClasses` and `getInnerClosureClasses`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]