Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19311#discussion_r140559549
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -544,20 +544,39 @@ private[spark] class MemoryStore(
}
if (freedMemory >= space) {
- logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
- s"(${Utils.bytesToString(freedMemory)} bytes)")
- for (blockId <- selectedBlocks) {
- val entry = entries.synchronized { entries.get(blockId) }
- // This should never be null as only one task should be dropping
- // blocks and removing entries. However the check is still here
for
- // future safety.
- if (entry != null) {
- dropBlock(blockId, entry)
+ var exceptionWasThrown: Boolean = true
+ try {
+ logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
+ s"(${Utils.bytesToString(freedMemory)} bytes)")
+ for (blockId <- selectedBlocks) {
+ val entry = entries.synchronized {
+ entries.get(blockId)
+ }
+ // This should never be null as only one task should be
dropping
+ // blocks and removing entries. However the check is still
here for
+ // future safety.
+ if (entry != null) {
+ dropBlock(blockId, entry)
+ }
+ }
+ exceptionWasThrown = false
+ logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
+ s"free memory is ${Utils.bytesToString(maxMemory -
blocksMemoryUsed)}")
+ freedMemory
+ } finally {
+ // like BlockManager.doPut, we use a finally rather than a catch
to avoid having to deal
+ // with InterruptedException
+ if (exceptionWasThrown) {
+ selectedBlocks.foreach { id =>
+ // some of the blocks may have already been unlocked, or
completely removed
+ blockInfoManager.get(id).foreach { info =>
--- End diff --
good point, thanks, I've handled this now
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]