[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21369 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209499005 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- @cloud-fan , do you think this is worth doing, I'm referring to the CompletionIterator delaying GC of the sub iterator and cleanup function (usually a closure referring to a larger collection). if so, I'd open a separate JIRA+PR for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480323 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert(it.isInstanceOf[CompletionIterator[_, _]]) +// org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns +// an instance of an annonymous Iterator class. + +val underlyingMapRef = WeakReference(map.currentMap) + +{ + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(!tmpIsNull) +} + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(map.numSpills == 0) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +// assert(map.currentMap == null) +eventually{ + System.gc() + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(tmpIsNull) +} + + +val next50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(!it.hasNext) +val keys = (first50Keys ++ next50Keys).sorted +assert(keys == (0 until 100)) + } + + test("drop all references to the underlying map once the iterator is exhausted") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val underlyingMapRef = WeakReference(map.currentMap) + +{ + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(!tmpIsNull) +} + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) + + +val keys = it.map{ + case (k, vs) => +val sortedVs = vs.sorted +assert(sortedVs.seq == (0 until 10).map(10 * k + _)) +k +} +.toList +.sorted + +assert(it.isEmpty) +assert(keys == (0 until 100)) + +assert(map.numSpills == 0) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) + +eventually{ --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480313 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { + CompletionIterator[(K, C), SpillableIterator](this, this.destroy ) --- End diff -- nit: no space before `)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480264 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert(it.isInstanceOf[CompletionIterator[_, _]]) +// org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns +// an instance of an annonymous Iterator class. + +val underlyingMapRef = WeakReference(map.currentMap) + +{ + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(!tmpIsNull) +} + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(map.numSpills == 0) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +// assert(map.currentMap == null) +eventually{ --- End diff -- nit: add a space `eventually {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480296 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { --- End diff -- nit: no space before `:` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480190 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -568,13 +573,14 @@ class ExternalAppendOnlyMap[K, V, C]( context.addTaskCompletionListener[Unit](context => cleanup()) } - private[this] class SpillableIterator(var upstream: Iterator[(K, C)]) + /** + * Exposed for testing + */ + private[collection] class SpillableIterator(var upstream: Iterator[(K, C)]) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480155 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -114,7 +117,10 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() - @volatile private var readingIterator: SpillableIterator = null + /** + * Exposed for testing + */ + @volatile private[collection] var readingIterator: SpillableIterator = null --- End diff -- This is not exposed in the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r198816624 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r192631230 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan , can we move on with this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r191655937 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan, how do you suggest to progress with this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r191077231 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan , I've tried the `System.gc` approach with and without `eventually`, and for no avail. seems like scalatest's asserts use some black magic macro based voodoo to generate a local method/val (when I got about 10 classes deep into my quest, I gave up) basically what I'm seeing in visualvm is the expected ref via the `WeakReference` and another surprising one via `UnaryMacroBool.left`. it seems this sneaky ref was generated by the following assertion: `assert(map.currentMap == null)`, but it could have easily been generated elsewhere. @cloud-fan , @gatorsmile , can you please confirm if and how can we import the scala code? otherwise, can you think of an alternative approach for testing this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user som-snytt commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r191064975 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r191050461 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- Anyway `eventually` is availabe in Spark test, we can use it if needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r191050447 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- `AccumulatorSuite` has a similar pattern and it works well(search System.gc). Maybe we don't need `eventually`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190829341 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- this requires using something like [scalatest's eventually](http://doc.scalatest.org/1.8/org/scalatest/concurrent/Eventually.html), don't you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190822967 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- how about this ``` val mapRef = new WeakReference(map.currentMap) ... // trigger spilling System.gc() assert(mapRef.get == null) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190584765 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +private[ExternalAppendOnlyMap] --- End diff -- It's weird to see a class private method. I'd suggest just remove `private[ExternalAppendOnlyMap]`. `spill` is only called in `ExternalAppendOnlyMap` and it's public. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190583904 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert(it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) --- End diff -- `assert(underlyingIt != null)`, we should not put space around. can you fix all of them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190542635 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) --- End diff -- hmm, we can in line 508 but not in this test. in this test we look at the iterator immediately after a spill, at this point upstream is supposed to be replaced by a `DiskMapIterator`, I guess we can check for this directly (after relaxing its visibility to package private). in line 508, we can simply compare with Iterator.empty --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190494153 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) --- End diff -- can we simply check `assert(underlyingIt.upstream eq Iterator.empty)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190375595 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- @cloud-fan , the assumption here is that there are two references to the underlying map: the upstream iterator and the external map itself. the destroy method first removes the ref via upstream-iterator than delegates to the method that clears the ref via the external map member (currentMap I think), so unless we've missed another ref we should be fine. as I wrote above, I think there's a potentially more fundamental issue with `CompletionIterator` which keeps holding references via it's `sub` and `completionFunction` members , these might stall some objects from being collected and can be eliminated upon exhaustion of the iterator. there might be some more 'candidates' like `LazyIterator` and `InterruptibleIterator`, I think this desrves some more investigation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190372506 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) --- End diff -- @cloud-fan , the assumption here is that there are two references to the underlying map: the upstream iterator and the external map itself. the destroy method first removes the ref via upstream-iterator than delegates to the method that clears the ref via the external map member (`currentMap` I think), so unless we've missed another ref we should be fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190371425 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +private[ExternalAppendOnlyMap] --- End diff -- hmm... the class itself is private (slightly relaxed to package private to ease testing) so I'm not sure what's the benefit in making the method public, in any case I think that once we see the use case for making this method public we'd probably has to further change the iterator/external map classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190370842 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) + +val next50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(!it.hasNext) +val keys = (first50Keys ++ next50Keys).sorted +assert(keys == (0 until 100)) + } + + test("drop all references to the underlying map once the iterator is exhausted") { --- End diff -- :+1: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190370765 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) --- End diff -- the underlying map's iterator is an anonymous class, this is the best I could come up with to check if the upstream iterator holds a ref to the underlying map. @cloud-fan , do you have a better idea (I'm not 100% happy with this one) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190284699 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- `destroy` re-assigns `upstream`, once `destroy` is called, we should be fine? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190286827 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) --- End diff -- we want to prove we are no longer holding the reference, why do we check type here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190280317 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +private[ExternalAppendOnlyMap] --- End diff -- it's pretty reasonable to have this method public. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190279240 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) + +val next50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(!it.hasNext) +val keys = (first50Keys ++ next50Keys).sorted +assert(keys == (0 until 100)) + } + + test("drop all references to the underlying map once the iterator is exhausted") { --- End diff -- let's also put the jira number in the test name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190285086 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) --- End diff -- nit: no space after `assert(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189939603 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) -readingIterator +readingIterator.toCompletionIterator --- End diff -- `destructiveIterator` should just return a destructive iterator (especially for map buffer) as it's function name implies, and it it none business of `CompletionIterator `. And developers should be free to define the complete function for the returned destructive iterator, in case of we want a different one somewhere else in future. > Your suggested codes does exactly the same but is less streamlined I don't think this little change will pay a huge influence on `streamlined `. > and relies on an intermediate value (fortunately it's already a member variable) The current fix leads to this, not me. And even this variable is not a member variable, we can define a temp local variable. It's not a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189921783 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { --- End diff -- This test was written BEFORE the actual fix and it did fail up untill the fix was in place. I do agree it's a bit clumsy and potential future changes may break the original intention of the test. I've referred a potential testing approach (currently limited to scala's source code) which couldn't be (easily) applied to this code base so I made a best effort to test this. I agree this needs better documentation, I'll start be referring the issue in the test's name and will also add comments to the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189919617 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty --- End diff -- Safer, class remains usable if for some reason hasNext is called again, and this costs absolutely nothing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189919031 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) -readingIterator +readingIterator.toCompletionIterator --- End diff -- What behavior does it change? Your suggested codes does exactly the same but is less streamlined and relies on an intermediate value (fortunately it's already a member variable) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189894423 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { --- End diff -- I understand what this test want to do. But it seems code without this PR could also pass it if everything goes normally. And I know it's a little hard to reflect the change by unit test. Also, I'd prefer to leave some comments to explain the potential memory leak in source code above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189892444 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) -readingIterator +readingIterator.toCompletionIterator --- End diff -- This change the original behavior of `destructiveIterator `. I'd prefer do like this: ``` CompletionIterator[(K, C), Iterator[(K, C)]]( destructiveIterator(currentMap.iterator), readingIterator.destroy) ``` which keep compatibility with current code, and do not introduce unnecessary function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189892547 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty --- End diff -- Why `empy`, not `null`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189794281 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189794046 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -305,8 +310,8 @@ class ExternalAppendOnlyMap[K, V, C]( // Input streams are derived both from the in-memory map and spilled maps on disk // The in-memory map is sorted in place, while the spilled maps are already in sorted order -private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator( - currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap()) +private val sortedMap = destructiveIterator( + currentMap.destructiveSortedIterator(keyComparator)) --- End diff -- unfortunately no, scala-style enforces a max of 100 chars per line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189794097 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +def destroy() : Unit = { --- End diff -- yes, fixing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189768075 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -305,8 +310,8 @@ class ExternalAppendOnlyMap[K, V, C]( // Input streams are derived both from the in-memory map and spilled maps on disk // The in-memory map is sorted in place, while the spilled maps are already in sorted order -private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator( - currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap()) +private val sortedMap = destructiveIterator( + currentMap.destructiveSortedIterator(keyComparator)) --- End diff -- These two lines can be merged into one line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189768116 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +def destroy() : Unit = { --- End diff -- Should be private as `freeCurrentMap` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189768301 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -630,7 +643,7 @@ private[spark] object ExternalAppendOnlyMap { } /** - * A comparator which sorts arbitrary keys based on their hash codes. + * A comparator which sorts arbitrary keys bas on their hash codes. --- End diff -- typo? bas -> based. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189768335 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -23,8 +23,9 @@ import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryTestingUtils +import org.apache.spark.util.CompletionIterator -class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { +class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ --- End diff -- Note the space. `LocalSparkContext {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189768271 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { --- End diff -- I'd prefer private for this method --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189452094 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- @jerrylead, I'd appreciate if you could test this. One more thing that bugs me is that there's another case when the iterator no longer needs the upstream iterator/underlying map but still holds a reference to it: When the iterator reach EOF its hasNext method returns false which causes the wrapping CompletionIterator to call the cleanup function which simply nulls the underlying map member variable in ExternalAppendOnlyMap, the iterator member is not nulled out so we end up with the CompletionIterator holding two paths to the upstrean iterator which leads to the underlying map: first it still holds a reference to the iterator itself, however it still holds a reference to the cleanup closure which refers the ExternalAppendOnlyMap which still refers to the current iterator which refers upstream... This can be solven in one of two ways: Simple way, when creating the completion iterator, provide a closure referring the iterator, not the ExternalAppendOnlyMap. Thorough way, modify completion iterator to null out references after cleaning up. Having that said, I'm not sure how long a completed iterator may be 'sitting' before being discarded so I'm not sure if this is worth fixing, especially using the thorough approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user JerryLead commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189451809 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- Thanks for fixing this issues. I think the potential solution is to change the `upstream` reference, but I have not tested if this change is sufficient and safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189438351 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- Basically yes, according to my understanding of the code this should have happened on the subsequent hasNext/next call. However according to the analysis in the jira the iterator kept holding this reference, my guess: at this point the entire program started suffering lengthy GC pauses that got it into behaving as if under a deadlock,effectively leaving the ref in place (just a guess) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189438190 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- Does the change means we should reassign `upstream` (which eliminates reference to `currentMap`) after spill **immediately**, otherwise, we may hit OOM (e.g. never `readNext()` after spill - is this the real cause for JIRA issue?) ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
GitHub user eyalfa opened a pull request: https://github.com/apache/spark/pull/21369 [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spilled during iteration ## What changes were proposed in this pull request? This PR solves [SPARK-22713](https://issues.apache.org/jira/browse/SPARK-22713) which describes a memory leak that occurs when and ExternalAppendOnlyMap is spilled during iteration (opposed to insertion). (Please fill in changes proposed in this fix) ExternalAppendOnlyMap's iterator supports spilling but it kept a reference to the internal map (via an internal iterator) after spilling, it seems that the original code was actually supposed to 'get rid' of this reference on the next iteration but according to the elaborate investigation described in the JIRA this didn't happen. the fix was simply replacing the internal iterator immediately after spilling. ## How was this patch tested? I've introduced a new test to test suite ExternalAppendOnlyMapSuite, this test asserts that neither the external map itself nor its iterator hold any reference to the internal map after a spill. These approach required some access relaxation of some members variables and nested classes of ExternalAppendOnlyMap, this members are now package provate and annotated with @VisibleForTesting. You can merge this pull request into a Git repository by running: $ git pull https://github.com/eyalfa/spark SPARK-22713__ExternalAppendOnlyMap_effective_spill Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21369.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21369 commit 1c4a6af2077e7ac50476101031aa1af99ff4f7b7 Author: Eyal FaragoDate: 2018-05-18T22:06:15Z SPARK-22713__ExternalAppendOnlyMap_effective_spill: add failing test. commit 82591e63bf30fad37b90956c226101e428d39787 Author: Eyal Farago Date: 2018-05-18T22:21:33Z SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix the issue by removing the reference to the initial iterator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org