Github user skonto commented on a diff in the pull request:
https://github.com/apache/spark/pull/21930#discussion_r207009194
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends
Logging {
return
}
- logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
- // A list of classes that represents closures enclosed in the given one
- val innerClasses = getInnerClosureClasses(func)
-
- // A list of enclosing objects and their respective classes, from
innermost to outermost
- // An outer object at a given index is of type outer class at the same
index
- val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
- // For logging purposes only
- val declaredFields = func.getClass.getDeclaredFields
- val declaredMethods = func.getClass.getDeclaredMethods
-
- if (log.isDebugEnabled) {
- logDebug(" + declared fields: " + declaredFields.size)
- declaredFields.foreach { f => logDebug(" " + f) }
- logDebug(" + declared methods: " + declaredMethods.size)
- declaredMethods.foreach { m => logDebug(" " + m) }
- logDebug(" + inner classes: " + innerClasses.size)
- innerClasses.foreach { c => logDebug(" " + c.getName) }
- logDebug(" + outer classes: " + outerClasses.size)
- outerClasses.foreach { c => logDebug(" " + c.getName) }
- logDebug(" + outer objects: " + outerObjects.size)
- outerObjects.foreach { o => logDebug(" " + o) }
- }
+ if(lambdaFunc.isEmpty) {
+ logDebug(s"+++ Cleaning closure $func (${func.getClass.getName})
+++")
+
+ // A list of classes that represents closures enclosed in the given
one
+ val innerClasses = getInnerClosureClasses(func)
+
+ // A list of enclosing objects and their respective classes, from
innermost to outermost
+ // An outer object at a given index is of type outer class at the
same index
+ val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+ // For logging purposes only
+ val declaredFields = func.getClass.getDeclaredFields
+ val declaredMethods = func.getClass.getDeclaredMethods
+
+ if (log.isDebugEnabled) {
+ logDebug(" + declared fields: " + declaredFields.size)
+ declaredFields.foreach { f => logDebug(" " + f) }
+ logDebug(" + declared methods: " + declaredMethods.size)
+ declaredMethods.foreach { m => logDebug(" " + m) }
+ logDebug(" + inner classes: " + innerClasses.size)
+ innerClasses.foreach { c => logDebug(" " + c.getName) }
+ logDebug(" + outer classes: " + outerClasses.size)
+ outerClasses.foreach { c => logDebug(" " + c.getName) }
+ logDebug(" + outer objects: " + outerObjects.size)
+ outerObjects.foreach { o => logDebug(" " + o) }
+ }
- // Fail fast if we detect return statements in closures
- getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
-
- // If accessed fields is not populated yet, we assume that
- // the closure we are trying to clean is the starting one
- if (accessedFields.isEmpty) {
- logDebug(s" + populating accessed fields because this is the
starting closure")
- // Initialize accessed fields with the outer classes first
- // This step is needed to associate the fields to the correct
classes later
- initAccessedFields(accessedFields, outerClasses)
-
- // Populate accessed fields by visiting all fields and methods
accessed by this and
- // all of its inner closures. If transitive cleaning is enabled,
this may recursively
- // visits methods that belong to other classes in search of
transitively referenced fields.
- for (cls <- func.getClass :: innerClasses) {
- getClassReader(cls).accept(new FieldAccessFinder(accessedFields,
cleanTransitively), 0)
+ // Fail fast if we detect return statements in closures
+ getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
+ // If accessed fields is not populated yet, we assume that
+ // the closure we are trying to clean is the starting one
+ if (accessedFields.isEmpty) {
+ logDebug(s" + populating accessed fields because this is the
starting closure")
+ // Initialize accessed fields with the outer classes first
+ // This step is needed to associate the fields to the correct
classes later
+ initAccessedFields(accessedFields, outerClasses)
+
+ // Populate accessed fields by visiting all fields and methods
accessed by this and
+ // all of its inner closures. If transitive cleaning is enabled,
this may recursively
+ // visits methods that belong to other classes in search of
transitively referenced fields.
+ for (cls <- func.getClass :: innerClasses) {
+ getClassReader(cls).accept(new FieldAccessFinder(accessedFields,
cleanTransitively), 0)
+ }
}
- }
- logDebug(s" + fields accessed by starting closure: " +
accessedFields.size)
- accessedFields.foreach { f => logDebug(" " + f) }
-
- // List of outer (class, object) pairs, ordered from outermost to
innermost
- // Note that all outer objects but the outermost one (first one in
this list) must be closures
- var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip
outerObjects).reverse
- var parent: AnyRef = null
- if (outerPairs.size > 0) {
- val (outermostClass, outermostObject) = outerPairs.head
- if (isClosure(outermostClass)) {
- logDebug(s" + outermost object is a closure, so we clone it:
${outerPairs.head}")
- } else if (outermostClass.getName.startsWith("$line")) {
- // SPARK-14558: if the outermost object is a REPL line object, we
should clone and clean it
- // as it may carray a lot of unnecessary information, e.g. hadoop
conf, spark conf, etc.
- logDebug(s" + outermost object is a REPL line object, so we clone
it: ${outerPairs.head}")
+ logDebug(s" + fields accessed by starting closure: " +
accessedFields.size)
+ accessedFields.foreach { f => logDebug(" " + f) }
+
+ // List of outer (class, object) pairs, ordered from outermost to
innermost
+ // Note that all outer objects but the outermost one (first one in
this list) must be closures
+ var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip
outerObjects).reverse
--- End diff --
Ok I can do.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]