[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21369


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-13 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r209499005
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

@cloud-fan , do you think this is worth doing, I'm referring to the 
CompletionIterator delaying GC of the sub iterator and cleanup function 
(usually a closure referring to a larger collection).
if so, I'd open a separate JIRA+PR for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r209480323
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert(it.isInstanceOf[CompletionIterator[_, _]])
+// 
org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns
+// an instance of an annonymous Iterator class.
+
+val underlyingMapRef = WeakReference(map.currentMap)
+
+{
+  // direct asserts introduced some macro generated code that held a 
reference to the map
+  val tmpIsNull = null == underlyingMapRef.get.orNull
+  assert(!tmpIsNull)
+}
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert(map.numSpills == 0)
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+// assert(map.currentMap == null)
+eventually{
+  System.gc()
+  // direct asserts introduced some macro generated code that held a 
reference to the map
+  val tmpIsNull = null == underlyingMapRef.get.orNull
+  assert(tmpIsNull)
+}
+
+
+val next50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert(!it.hasNext)
+val keys = (first50Keys ++ next50Keys).sorted
+assert(keys == (0 until 100))
+  }
+
+  test("drop all references to the underlying map once the iterator is 
exhausted") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val underlyingMapRef = WeakReference(map.currentMap)
+
+{
+  // direct asserts introduced some macro generated code that held a 
reference to the map
+  val tmpIsNull = null == underlyingMapRef.get.orNull
+  assert(!tmpIsNull)
+}
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+
+
+val keys = it.map{
+  case (k, vs) =>
+val sortedVs = vs.sorted
+assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+k
+}
+.toList
+.sorted
+
+assert(it.isEmpty)
+assert(keys == (0 until 100))
+
+assert(map.numSpills == 0)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+
+eventually{
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r209480313
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+def toCompletionIterator: CompletionIterator[(K, C), 
SpillableIterator] = {
+  CompletionIterator[(K, C), SpillableIterator](this, this.destroy )
--- End diff --

nit: no space before `)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r209480264
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert(it.isInstanceOf[CompletionIterator[_, _]])
+// 
org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns
+// an instance of an annonymous Iterator class.
+
+val underlyingMapRef = WeakReference(map.currentMap)
+
+{
+  // direct asserts introduced some macro generated code that held a 
reference to the map
+  val tmpIsNull = null == underlyingMapRef.get.orNull
+  assert(!tmpIsNull)
+}
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert(map.numSpills == 0)
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+// assert(map.currentMap == null)
+eventually{
--- End diff --

nit: add a space `eventually {`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r209480296
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
--- End diff --

nit: no space before `:`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r209480190
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -568,13 +573,14 @@ class ExternalAppendOnlyMap[K, V, C](
 context.addTaskCompletionListener[Unit](context => cleanup())
   }
 
-  private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
+  /**
+   * Exposed for testing
+   */
+  private[collection] class SpillableIterator(var upstream: Iterator[(K, 
C)])
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r209480155
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -114,7 +117,10 @@ class ExternalAppendOnlyMap[K, V, C](
   private val keyComparator = new HashComparator[K]
   private val ser = serializer.newInstance()
 
-  @volatile private var readingIterator: SpillableIterator = null
+  /**
+   * Exposed for testing
+   */
+  @volatile private[collection] var readingIterator: SpillableIterator = 
null
--- End diff --

This is not exposed in the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-06-28 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r198816624
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

@cloud-fan ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-06-03 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r192631230
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

@cloud-fan , can we move on with this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-30 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r191655937
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

@cloud-fan, how do you suggest to progress with this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-27 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r191077231
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

@cloud-fan , I've tried the `System.gc` approach with and without 
`eventually`, and for no avail.
seems like scalatest's asserts use some black magic macro based voodoo to 
generate a local method/val (when I got about 10 classes deep into my quest, I 
gave up) 
basically what I'm seeing in visualvm is the expected ref via the 
`WeakReference` and another surprising one via `UnaryMacroBool.left`.

it seems this sneaky ref was generated by the following assertion: 
`assert(map.currentMap == null)`, but it could have easily been generated 
elsewhere.

@cloud-fan , @gatorsmile , can you please confirm if and how can we import 
the scala code? otherwise, can you think of an alternative approach for testing 
this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-26 Thread som-snytt
Github user som-snytt commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r191064975
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r191050461
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

Anyway `eventually` is availabe in Spark test, we can use it if needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r191050447
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

`AccumulatorSuite` has a similar pattern and it works well(search 
System.gc). Maybe we don't need `eventually`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-25 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190829341
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

this requires using something like [scalatest's 
eventually](http://doc.scalatest.org/1.8/org/scalatest/concurrent/Eventually.html),
 don't you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190822967
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

how about this
```
val mapRef = new WeakReference(map.currentMap)
... // trigger spilling
System.gc()
assert(mapRef.get == null)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190584765
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+private[ExternalAppendOnlyMap]
--- End diff --

It's weird to see a class private method. I'd suggest just remove 
`private[ExternalAppendOnlyMap]`. `spill` is only called in 
`ExternalAppendOnlyMap` and it's public.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190583904
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert(it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
--- End diff --

`assert(underlyingIt != null)`, we should not put space around. can you fix 
all of them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-24 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190542635
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
--- End diff --

hmm, we can in line 508 but not in this test.
in this test we look at the iterator immediately after a spill, at this 
point upstream is supposed to be replaced by a `DiskMapIterator`, I guess we 
can check for this directly (after relaxing its visibility to package private).

in line 508, we can simply compare with Iterator.empty


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190494153
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
--- End diff --

can we simply check `assert(underlyingIt.upstream eq Iterator.empty)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190375595
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

@cloud-fan , the assumption here is that there are two references to the 
underlying map: the upstream iterator and the external map itself.
the destroy method first removes the ref via upstream-iterator than 
delegates to the method that clears the ref via the external map member 
(currentMap I think), so unless we've missed another ref we should be fine.

as I wrote above, I think there's a potentially more fundamental issue with 
`CompletionIterator` which keeps holding references via it's `sub` and 
`completionFunction` members , these might stall some objects from being 
collected and can be eliminated upon exhaustion of the iterator. there might be 
some more 'candidates' like `LazyIterator` and `InterruptibleIterator`, I think 
this desrves some more investigation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190372506
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
--- End diff --

@cloud-fan , the assumption here is that there are two references to the 
underlying map: the upstream iterator and the external map itself.
the destroy method first removes the ref via upstream-iterator than 
delegates to the method that clears the ref via the external map member 
(`currentMap` I think), so unless we've missed another ref we should be fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190371425
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+private[ExternalAppendOnlyMap]
--- End diff --

hmm... the class itself is private (slightly relaxed to package private to 
ease testing) so I'm not sure what's the benefit in making the method public,
in any case I think that once we see the use case for making this method 
public we'd probably has to further change the iterator/external map classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190370842
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
+
+val next50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert(!it.hasNext)
+val keys = (first50Keys ++ next50Keys).sorted
+assert(keys == (0 until 100))
+  }
+
+  test("drop all references to the underlying map once the iterator is 
exhausted") {
--- End diff --

:+1: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190370765
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
--- End diff --

the underlying map's iterator is an anonymous class, this is the best I 
could come up with to check if the upstream iterator holds a ref to the 
underlying map.
@cloud-fan , do you have a better idea (I'm not 100% happy with this one)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190284699
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

`destroy` re-assigns `upstream`, once `destroy` is called, we should be 
fine?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190286827
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
--- End diff --

we want to prove we are no longer holding the reference, why do we check 
type here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190280317
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+private[ExternalAppendOnlyMap]
--- End diff --

it's pretty reasonable to have this method public.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190279240
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
+
+val next50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert(!it.hasNext)
+val keys = (first50Keys ++ next50Keys).sorted
+assert(keys == (0 until 100))
+  }
+
+  test("drop all references to the underlying map once the iterator is 
exhausted") {
--- End diff --

let's also put the jira number in the test name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190285086
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
--- End diff --

nit: no space after `assert(`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189939603
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): 
Iterator[(K, C)] = {
 readingIterator = new SpillableIterator(inMemoryIterator)
-readingIterator
+readingIterator.toCompletionIterator
--- End diff --

`destructiveIterator` should  just return a destructive iterator 
(especially for map buffer) as it's function name implies, and it it none 
business of `CompletionIterator `. And developers should be free to define the 
complete function for the returned destructive iterator, in case of we want a 
different one somewhere else in future.

 > Your suggested codes does exactly the same but is less streamlined 

I don't think this little change will pay a huge influence on `streamlined 
`.

> and relies on an intermediate value (fortunately it's already a member 
variable)

The current fix leads to this, not me. And even this variable is not a 
member variable, we can define a temp local variable. It's not a big deal.






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189921783
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
--- End diff --

This test was written BEFORE the actual fix and it did fail up untill the 
fix was in place. I do agree it's a bit clumsy and potential future changes may 
break the original intention of the test. I've referred a potential testing 
approach (currently limited to scala's source code) which couldn't be (easily) 
applied to this code base so I made a best effort to test this.
I agree this needs better documentation, I'll start be referring the issue 
in the test's name and will also add comments to the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189919617
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
--- End diff --

Safer, class remains usable if for some reason hasNext is called again, and 
this costs absolutely nothing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189919031
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): 
Iterator[(K, C)] = {
 readingIterator = new SpillableIterator(inMemoryIterator)
-readingIterator
+readingIterator.toCompletionIterator
--- End diff --

What behavior does it change? Your suggested codes does exactly the same 
but is less streamlined and relies on an intermediate value (fortunately it's 
already a member variable)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189894423
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
--- End diff --

I understand what this test want to do. But it seems code without this PR 
could also pass it if everything goes normally. And I know it's a little hard 
to reflect the change by unit test. Also, I'd prefer to leave some comments to 
explain the potential memory leak in source code above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189892444
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): 
Iterator[(K, C)] = {
 readingIterator = new SpillableIterator(inMemoryIterator)
-readingIterator
+readingIterator.toCompletionIterator
--- End diff --

This change the original behavior of `destructiveIterator `. I'd prefer do 
like this:

```
CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), readingIterator.destroy)
```
which keep compatibility with current code, and do not introduce 
unnecessary function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189892547
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
--- End diff --

Why `empy`, not `null`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189794281
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+def toCompletionIterator: CompletionIterator[(K, C), 
SpillableIterator] = {
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189794046
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -305,8 +310,8 @@ class ExternalAppendOnlyMap[K, V, C](
 
 // Input streams are derived both from the in-memory map and spilled 
maps on disk
 // The in-memory map is sorted in place, while the spilled maps are 
already in sorted order
-private val sortedMap = CompletionIterator[(K, C), Iterator[(K, 
C)]](destructiveIterator(
-  currentMap.destructiveSortedIterator(keyComparator)), 
freeCurrentMap())
+private val sortedMap = destructiveIterator(
+  currentMap.destructiveSortedIterator(keyComparator))
--- End diff --

unfortunately no, scala-style enforces a max of 100 chars per line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189794097
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+def destroy() : Unit = {
--- End diff --

yes, fixing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-21 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189768075
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -305,8 +310,8 @@ class ExternalAppendOnlyMap[K, V, C](
 
 // Input streams are derived both from the in-memory map and spilled 
maps on disk
 // The in-memory map is sorted in place, while the spilled maps are 
already in sorted order
-private val sortedMap = CompletionIterator[(K, C), Iterator[(K, 
C)]](destructiveIterator(
-  currentMap.destructiveSortedIterator(keyComparator)), 
freeCurrentMap())
+private val sortedMap = destructiveIterator(
+  currentMap.destructiveSortedIterator(keyComparator))
--- End diff --

These two lines can be merged into one line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-21 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189768116
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+def destroy() : Unit = {
--- End diff --

Should be private as `freeCurrentMap`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-21 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189768301
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -630,7 +643,7 @@ private[spark] object ExternalAppendOnlyMap {
   }
 
   /**
-   * A comparator which sorts arbitrary keys based on their hash codes.
+   * A comparator which sorts arbitrary keys bas on their hash codes.
--- End diff --

typo?
bas -> based.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-21 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189768335
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -23,8 +23,9 @@ import org.apache.spark._
 import org.apache.spark.internal.config._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.memory.MemoryTestingUtils
+import org.apache.spark.util.CompletionIterator
 
-class ExternalAppendOnlyMapSuite extends SparkFunSuite with 
LocalSparkContext {
+class ExternalAppendOnlyMapSuite extends SparkFunSuite with 
LocalSparkContext{
--- End diff --

Note the space. `LocalSparkContext {`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-21 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189768271
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+def toCompletionIterator: CompletionIterator[(K, C), 
SpillableIterator] = {
--- End diff --

I'd prefer private for this method


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-20 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189452094
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

@jerrylead, I'd appreciate if you could test this.
One more thing that bugs me is that there's another case when the iterator 
no longer needs the upstream iterator/underlying map but still holds a 
reference to it:
When the iterator reach EOF its hasNext method returns false which causes 
the wrapping CompletionIterator to call the cleanup function which simply nulls 
the underlying map member variable in ExternalAppendOnlyMap, the iterator 
member is not nulled out so we end up with the CompletionIterator holding two 
paths to the upstrean iterator which leads to the underlying map: first it 
still holds a reference to the iterator itself, however it still holds a 
reference to the cleanup closure which refers the ExternalAppendOnlyMap which 
still refers to the current iterator which refers upstream...
This can be solven in one of two ways:
Simple way, when creating the completion iterator, provide a closure 
referring the iterator, not the ExternalAppendOnlyMap.
Thorough way, modify completion iterator to null out references after 
cleaning up.

Having that said, I'm not sure how long a completed iterator may be 
'sitting' before being discarded so I'm not sure if this is worth fixing, 
especially using the thorough approach.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-19 Thread JerryLead
Github user JerryLead commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189451809
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

Thanks for fixing this issues. I think the potential solution is to change 
the `upstream` reference, but I have not tested if this change is sufficient 
and safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189438351
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

Basically yes, according to my understanding of the code this should have 
happened on the subsequent hasNext/next call. However according to the analysis 
in the jira the iterator kept holding this reference, my guess: at this point 
the entire program started suffering lengthy GC pauses that got it into 
behaving as if under a deadlock,effectively leaving the ref in place (just a 
guess)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-19 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189438190
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

Does the change means we should reassign `upstream` (which eliminates 
reference to `currentMap`) after  spill **immediately**, otherwise,  we may hit 
OOM (e.g. never `readNext()` after spill - is this the real cause for JIRA 
issue?) ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-18 Thread eyalfa
GitHub user eyalfa opened a pull request:

https://github.com/apache/spark/pull/21369

[SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spilled during 
iteration

## What changes were proposed in this pull request?
This PR solves 
[SPARK-22713](https://issues.apache.org/jira/browse/SPARK-22713) which 
describes a memory leak that occurs when and ExternalAppendOnlyMap is spilled 
during iteration (opposed to  insertion).

(Please fill in changes proposed in this fix)
ExternalAppendOnlyMap's iterator supports spilling but it kept a reference 
to the internal map (via an internal iterator) after spilling, it seems that 
the original code was actually supposed to 'get rid' of this reference on the 
next iteration but according to the elaborate investigation described in the 
JIRA this didn't happen.
the fix was simply replacing the internal iterator immediately after 
spilling.

## How was this patch tested?
I've introduced a new test to test suite ExternalAppendOnlyMapSuite, this 
test asserts that neither the external map itself nor its iterator hold any 
reference to the internal map after a spill.
These approach required some access relaxation of some members variables 
and nested classes of ExternalAppendOnlyMap, this members are now package 
provate and annotated with @VisibleForTesting.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eyalfa/spark 
SPARK-22713__ExternalAppendOnlyMap_effective_spill

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21369


commit 1c4a6af2077e7ac50476101031aa1af99ff4f7b7
Author: Eyal Farago 
Date:   2018-05-18T22:06:15Z

SPARK-22713__ExternalAppendOnlyMap_effective_spill: add failing test.

commit 82591e63bf30fad37b90956c226101e428d39787
Author: Eyal Farago 
Date:   2018-05-18T22:21:33Z

SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix the issue by 
removing the reference to the initial iterator.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org