Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/5685#discussion_r29393859
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -126,34 +243,66 @@ private[spark] object ClosureCleaner extends Logging {
}
}
+ // List of outer (class, object) pairs, ordered from outermost to
innermost
+ // Note that all outer objects but the outermost one (first one in
this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip
outerObjects).reverse
- var outer: AnyRef = null
+ var parent: AnyRef = null
if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
// The closure is ultimately nested inside a class; keep the object
of that
// class without cloning it since we don't want to clone the user's
objects.
- outer = outerPairs.head._2
+ // Note that we still need to keep around the outermost object
itself because
+ // we need it to clone its child closure later (see below).
+ logDebug(s" + outermost object is not a closure, so do not clone it:
${outerPairs.head}")
+ parent = outerPairs.head._2 // e.g. SparkContext
outerPairs = outerPairs.tail
+ } else if (outerPairs.size > 0) {
+ logDebug(s" + outermost object is a closure, so we just keep it:
${outerPairs.head}")
+ } else {
+ logDebug(" + there are no enclosing objects!")
}
+
// Clone the closure objects themselves, nulling out any fields that
are not
// used in the closure we're working on or any of its inner closures.
for ((cls, obj) <- outerPairs) {
- outer = instantiateClass(cls, outer, inInterpreter)
+ logDebug(s" + cloning the object $obj of class ${cls.getName}")
+ // We null out these unused references by cloning each object and
then filling in all
+ // required fields from the original object. We need the parent here
because the Java
+ // language specification requires the first constructor parameter
of any closure to be
+ // its enclosing object.
+ val clone = instantiateClass(cls, parent, inInterpreter)
for (fieldName <- accessedFields(cls)) {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
- // logInfo("1: Setting " + fieldName + " on " + cls + " to " +
value);
- field.set(outer, value)
+ field.set(clone, value)
}
+ // If transitive cleaning is enabled, we recursively clean any
enclosing closure using
+ // the already populated accessed fields map of the starting closure
+ if (cleanTransitively && isClosure(clone.getClass)) {
+ logDebug(s" + cleaning cloned closure $clone recursively
(${cls.getName})")
+ clean(clone, checkSerializable, cleanTransitively, accessedFields)
--- End diff --
should `checkSerailizable` be false here? It seems like theoretically we
only need to check the very final closure. It's a bit hard for me to reason
about whether we could have a weird state where `checkSerializable` throws
false positive errors during recursion.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]