Github user szhem commented on a diff in the pull request:
https://github.com/apache/spark/pull/23083#discussion_r236057101
--- Diff:
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C](
spills.clear()
forceSpillFiles.foreach(s => s.file.delete())
forceSpillFiles.clear()
- if (map != null || buffer != null) {
+ if (map != null || buffer != null || readingIterator != null) {
map = null // So that the memory can be garbage-collected
buffer = null // So that the memory can be garbage-collected
+ readingIterator = null // So that the memory can be garbage-collected
--- End diff --
@advancedxy I've tried to remove all the modifications except for this one
and got OutOfMemoryErrors once again. Here are the details:
1. Now there are 4 `ExternalSorter` remained
2 of them are not closed ones ...

and 2 of them are closed ones ...

as expected
2. There are 2 `SpillableIterator`s (which consume a significant part of
memory) of already closed `ExternalSorter`s remained

3. These `SpillableIterator`s are referenced by `CompletionIterator`s ...

... which in their order seem to be referenced by the `cur` field ...

... of the standard `Iterator`'s `flatMap` that is used in the `compute`
method of `CoalescedRDD`

Standard `Iterator`'s `flatMap` does not clean up its `cur` field before
obtaining the next value for it which in its order will consume quite a lot of
memory too

.. and in case of Spark that means that the previous iterator consuming the
memory will live there while fetching the next value for it

So I've returned the changes made to the `CompletionIterator` to reassign
the reference of its sub-iterator to the `empty` iterator ...

... and that has helped.
P.S. I believe that cleaning up the standard `flatMap`'s iterator `cur`
field before calling `nextCur` could help too
```scala
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new
AbstractIterator[B] {
private var cur: Iterator[B] = empty
private def nextCur() { cur = f(self.next()).toIterator }
def hasNext: Boolean = {
// Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
// but slightly shorter bytecode (better JVM inlining!)
while (!cur.hasNext) {
cur = empty
if (!self.hasNext) return false
nextCur()
}
true
}
def next(): B = (if (hasNext) cur else empty).next()
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]