[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21322


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-10-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r224874828
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,30 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(resource: (BlockId, MemoryEntry[_])): 
Unit = {
+maybeReleaseResources(resource._1, resource._2)
+  }
+
+  private def maybeReleaseResources(blockId: BlockId, entry: 
MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(values: Array[Any], _, _) => 
maybeCloseValues(values, blockId)
+  case _ =>
+}
+  }
+
+  private def maybeCloseValues(values: Array[Any], blockId: BlockId): Unit 
= {
+if (blockId.isBroadcast) {
+  values.foreach(value => Utils.tryClose(value))
--- End diff --

Just a style thing, but could be `values.foreach(Utils.tryClose)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-10-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r224875899
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1930,6 +1930,18 @@ private[spark] object Utils extends Logging {
 }
   }
 
+  def tryClose(value: Any): Unit = {
--- End diff --

This should accept at best `AnyRef`. It doesn't really seem like we need a 
new global utility method for this. It's a little unusual to try closing things 
that aren't `Closeable` and we can try to rationalize that in the callers above 
if possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-10-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r224875111
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,30 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(resource: (BlockId, MemoryEntry[_])): 
Unit = {
+maybeReleaseResources(resource._1, resource._2)
+  }
+
+  private def maybeReleaseResources(blockId: BlockId, entry: 
MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
--- End diff --

Why not just make these case classes `Closeable` and then you can close 
them consistently


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-18 Thread JeetKunDoug
Github user JeetKunDoug commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r189276054
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,14 +385,37 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(values: Array[Any], _, _) => 
maybeCloseValues(values)
+  case _ =>
+}
+  }
+
+  private def maybeCloseValues(values: Array[Any]): Unit = {
+values.foreach {
+  case closable: AutoCloseable =>
+safelyCloseValue(closable)
+  case _ =>
+}
+  }
+
+  private def safelyCloseValue(closable: AutoCloseable): Unit = {
+try {
+  closable.close()
+} catch {
+  case ex: Exception => logWarning(s"Failed to close AutoClosable 
$closable", ex)
+}
+  }
+
   def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
 val entry = entries.synchronized {
   entries.remove(blockId)
 }
 if (entry != null) {
-  entry match {
-case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
-case _ =>
+  if (blockId.isBroadcast) {
+maybeReleaseResources(entry)
--- End diff --

@dbtsai thanks - That whole "my day job vs. OSS" rush to fix. Will take 
care of it correctly and push an update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r189210100
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,14 +385,37 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(values: Array[Any], _, _) => 
maybeCloseValues(values)
+  case _ =>
+}
+  }
+
+  private def maybeCloseValues(values: Array[Any]): Unit = {
+values.foreach {
+  case closable: AutoCloseable =>
+safelyCloseValue(closable)
+  case _ =>
+}
+  }
+
+  private def safelyCloseValue(closable: AutoCloseable): Unit = {
+try {
+  closable.close()
+} catch {
+  case ex: Exception => logWarning(s"Failed to close AutoClosable 
$closable", ex)
+}
+  }
+
   def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
--- End diff --

To do it in `remove`, I don't think we can avoid the issue I mentioned 
before. If you have a deserilized value in broadcast cache, it's possible to be 
cleaned by GC if this broadcasted value isn't stored as deserialized entry in 
`MemoryStore`.

If the object already claims some resources we want to release by using 
`AutoCloseable` interface, we don't properly release it when it's cleaned by 
GC. That is happened before `remove` is called.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-18 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r189187759
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -404,6 +428,7 @@ private[spark] class MemoryStore(
 
   def clear(): Unit = memoryManager.synchronized {
 entries.synchronized {
+  entries.values().asScala.foreach(maybeReleaseResources)
--- End diff --

Should we check if the keys are `blockId.isBroadcast == true`? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-18 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r189187178
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,14 +385,37 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(values: Array[Any], _, _) => 
maybeCloseValues(values)
+  case _ =>
+}
+  }
+
+  private def maybeCloseValues(values: Array[Any]): Unit = {
+values.foreach {
+  case closable: AutoCloseable =>
+safelyCloseValue(closable)
+  case _ =>
+}
+  }
+
+  private def safelyCloseValue(closable: AutoCloseable): Unit = {
+try {
+  closable.close()
+} catch {
+  case ex: Exception => logWarning(s"Failed to close AutoClosable 
$closable", ex)
+}
+  }
+
   def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
 val entry = entries.synchronized {
   entries.remove(blockId)
 }
 if (entry != null) {
-  entry match {
-case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
-case _ =>
+  if (blockId.isBroadcast) {
+maybeReleaseResources(entry)
--- End diff --

In this case, what happen when the blockId is not broadcast? The existing 
cleaning-up will not be called.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-15 Thread JeetKunDoug
Github user JeetKunDoug commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188325608
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,36 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(objs: Array[Any], _, _) => 
maybeCloseValues(objs)
--- End diff --

Actually, digging further, there's other places where we may deserialize an 
object from the disk store and never put it into the memory store - it seems 
like punting on a guarantee that your AutoClosable object is closed and making 
this a best-effort thing when calling `BlockManager.removeBroadcast` (which is 
how we used it in the case that caused us to put together this PR) may make the 
most sense. It'll still be better than depending on GC and a finalizer to get 
the resources cleaned up when the driver can call `Broadcast#destroy` but we 
can document it as a best practice to have one just in case the close() call 
doesn't happen due to edge cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-15 Thread JeetKunDoug
Github user JeetKunDoug commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188314886
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,36 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(objs: Array[Any], _, _) => 
maybeCloseValues(objs)
--- End diff --

Ah- ok, I see where the issue is. So in this case you may have a 
deserialized instance but the memory store is full, so it fails to be put. Now 
we've got a live, deserialized object not in MemoryStore. Thanks for catching 
this. It looks like this case could be handled in 
`MemoryStore.putIteratorAsValues` when the `putIterator` call fails, which 
would handle several cases in `BlockManager` where we try (and fail) to put 
deserialized values, but it means a check for potential `AutoClosable` values 
any time we fail to put into `MemoryStore`, and I'm not sure of the performance 
impact of this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-15 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188306362
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,36 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(objs: Array[Any], _, _) => 
maybeCloseValues(objs)
--- End diff --

In theory, you can have working broadcasted object and at the same time it 
is not in `MemoryStore`.

During storing the merged object into `BlockManager` by calling 
`putSingle`, it can be stored into disk store.

Once the object is going to be used, if we can't find it in cache, we call 
`BlockManager.getLocalValues` to retrieve it back from disk store. Although it 
will try to store it to `MemoryStore`, it may not success.

I think the point is here the change assumes that if there is a 
deserialized broadcasted object, it is definitely in `MemoryStore`. But if I 
read the code correctly, it is not the case. You can have serialized bytes of 
the object in disk store and use a deserialized object at the same time.





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-15 Thread JeetKunDoug
Github user JeetKunDoug commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188295537
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,36 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(objs: Array[Any], _, _) => 
maybeCloseValues(objs)
--- End diff --

I wouldn't expect a never-deserialized Memory Entry to be closed, as it was 
never really instantiated to begin with - so if it _only_ lands on disk, I 
think that's reasonable (as the variable in question would never have had a 
chance to allocate anything either).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188128515
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,36 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(objs: Array[Any], _, _) => 
maybeCloseValues(objs)
+  case _ =>
+}
+  }
+
+  private def maybeCloseValues(objs: Array[Any]): Unit = {
+objs.foreach {
+case closable: AutoCloseable =>
--- End diff --

indent style: two spaces.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188128177
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,36 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(objs: Array[Any], _, _) => 
maybeCloseValues(objs)
--- End diff --

As I know, broadcasted variables can be serialized on disk too 
(`BlockManager.doPutIterator`). In the case, seems `AutoCloseable` broadcasted 
variables won't hit this release logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-14 Thread JeetKunDoug
Github user JeetKunDoug commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188074445
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -526,4 +526,84 @@ class MemoryStoreSuite
   }
 }
   }
+
+  test("[SPARK-24225]: remove should close AutoCloseable object") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker()
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.remove(id)
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: remove should close AutoCloseable objects even if 
they throw exceptions") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker(true)
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.remove(id)
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close AutoCloseable objects") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.clear()
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close all AutoCloseable objects put 
together in an iterator") {
--- End diff --

So if I understand the API correctly, there is no way to remove a single 
item that was put as part of a call to `putIterator` - because operations are 
conducted by `blockId` you would only be able to remove the whole group of 
entries, not a single part of an iterator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-14 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188033806
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -526,4 +526,84 @@ class MemoryStoreSuite
   }
 }
   }
+
+  test("[SPARK-24225]: remove should close AutoCloseable object") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker()
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.remove(id)
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: remove should close AutoCloseable objects even if 
they throw exceptions") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker(true)
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.remove(id)
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close AutoCloseable objects") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.clear()
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close all AutoCloseable objects put 
together in an iterator") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id1 = BroadcastBlockId(1)
+val tracker2 = new CloseTracker
+val tracker1 = new CloseTracker
+store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), 
ClassTag.Any)
+assert(store.contains(id1))
+store.clear()
+assert(tracker1.getClosed())
+assert(tracker2.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close AutoCloseable objects even if 
they throw exceptions") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id1 = BroadcastBlockId(1)
+val id2 = BroadcastBlockId(2)
+val tracker2 = new CloseTracker(true)
+val tracker1 = new CloseTracker(true)
+store.putIteratorAsValues(id1, Iterator(tracker1), ClassTag.Any)
+store.putIteratorAsValues(id2, Iterator(tracker2), ClassTag.Any)
+assert(store.contains(id1))
+assert(store.contains(id2))
+store.clear()
+assert(tracker1.getClosed())
+assert(tracker2.getClosed())
+  }
+}
+
+private class CloseTracker (val throwsOnClosed: Boolean = false) extends 
AutoCloseable {
+  var closed = false
+  override def close(): Unit = {
+closed = true
+if (throwsOnClosed) {
+  throw new RuntimeException("Throwing")
--- End diff --

Could you add `var isExcpetionThrown = false`, and check it in the test 
whether the exception is thrown?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-14 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188034118
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -526,4 +526,84 @@ class MemoryStoreSuite
   }
 }
   }
+
+  test("[SPARK-24225]: remove should close AutoCloseable object") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker()
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.remove(id)
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: remove should close AutoCloseable objects even if 
they throw exceptions") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker(true)
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.remove(id)
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close AutoCloseable objects") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.clear()
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close all AutoCloseable objects put 
together in an iterator") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id1 = BroadcastBlockId(1)
+val tracker2 = new CloseTracker
+val tracker1 = new CloseTracker
+store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), 
ClassTag.Any)
+assert(store.contains(id1))
+store.clear()
+assert(tracker1.getClosed())
+assert(tracker2.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close AutoCloseable objects even if 
they throw exceptions") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id1 = BroadcastBlockId(1)
+val id2 = BroadcastBlockId(2)
+val tracker2 = new CloseTracker(true)
+val tracker1 = new CloseTracker(true)
+store.putIteratorAsValues(id1, Iterator(tracker1), ClassTag.Any)
+store.putIteratorAsValues(id2, Iterator(tracker2), ClassTag.Any)
+assert(store.contains(id1))
+assert(store.contains(id2))
+store.clear()
+assert(tracker1.getClosed())
+assert(tracker2.getClosed())
+  }
+}
+
+private class CloseTracker (val throwsOnClosed: Boolean = false) extends 
AutoCloseable {
+  var closed = false
+  override def close(): Unit = {
+closed = true
+if (throwsOnClosed) {
+  throw new RuntimeException("Throwing")
+}
+  }
+  def getClosed(): Boolean = {
+closed
--- End diff --

since `closed` is public, you might use it directly. Or you can make 
`closed` private. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-14 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188035443
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -526,4 +526,84 @@ class MemoryStoreSuite
   }
 }
   }
+
+  test("[SPARK-24225]: remove should close AutoCloseable object") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker()
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.remove(id)
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: remove should close AutoCloseable objects even if 
they throw exceptions") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker(true)
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.remove(id)
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close AutoCloseable objects") {
+
+val (store, _) = makeMemoryStore(12000)
+
+val id = BroadcastBlockId(0)
+val tracker = new CloseTracker
+store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
+assert(store.contains(id))
+store.clear()
+assert(tracker.getClosed())
+  }
+
+  test("[SPARK-24225]: clear should close all AutoCloseable objects put 
together in an iterator") {
--- End diff --

Can you check if you have multiple autocloseable objects in an iterator, 
and only one of them is removed, the rests of the objects will not be closed? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-14 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188032854
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,36 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(objs: Array[Any], _, _) => 
maybeCloseValues(objs)
+  case _ =>
+}
+  }
+
+  private def maybeCloseValues(objs: Array[Any]): Unit = {
--- End diff --

`values: Array[Any]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

2018-05-14 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21322#discussion_r188032698
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -384,15 +385,36 @@ private[spark] class MemoryStore(
 }
   }
 
+  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
+entry match {
+  case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+  case DeserializedMemoryEntry(objs: Array[Any], _, _) => 
maybeCloseValues(objs)
--- End diff --

`DeserializedMemoryEntry(values, _, _)` to match the rest of code style. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org