Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21369#discussion_r209480264
--- Diff:
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
---
@@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends
SparkFunSuite with LocalSparkContext {
sc.stop()
}
- test("external aggregation updates peak execution memory") {
+ test("SPARK-22713 spill during iteration leaks internal map") {
+ val size = 1000
+ val conf = createSparkConf(loadDefaults = true)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+ val map = createExternalMap[Int]
+
+ map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+ assert(map.numSpills == 0, "map was not supposed to spill")
+
+ val it = map.iterator
+ assert(it.isInstanceOf[CompletionIterator[_, _]])
+ //
org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns
+ // an instance of an annonymous Iterator class.
+
+ val underlyingMapRef = WeakReference(map.currentMap)
+
+ {
+ // direct asserts introduced some macro generated code that held a
reference to the map
+ val tmpIsNull = null == underlyingMapRef.get.orNull
+ assert(!tmpIsNull)
+ }
+
+ val first50Keys = for ( _ <- 0 until 50) yield {
+ val (k, vs) = it.next
+ val sortedVs = vs.sorted
+ assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+ k
+ }
+ assert(map.numSpills == 0)
+ map.spill(Long.MaxValue, null)
+ // these asserts try to show that we're no longer holding references
to the underlying map.
+ // it'd be nice to use something like
+ //
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+ // (lines 69-89)
+ // assert(map.currentMap == null)
+ eventually{
--- End diff --
nit: add a space `eventually {`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]