Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/791#discussion_r12840780
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -166,45 +166,51 @@ private class MemoryStore(blockManager: BlockManager,
maxMemory: Long)
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {
- /* TODO: Its possible to optimize the locking by locking entries only
when selecting blocks
- * to be dropped. Once the to-be-dropped blocks have been selected,
and lock on entries has
- * been released, it must be ensured that those to-be-dropped blocks
are not double counted
- * for freeing up more space for another block that needs to be put.
Only then the actually
- * dropping of blocks (and writing to disk if necessary) can proceed
in parallel. */
-
var putSuccess = false
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- putLock.synchronized {
- val freeSpaceResult = ensureFreeSpace(blockId, size)
- val enoughFreeSpace = freeSpaceResult.success
- droppedBlocks ++= freeSpaceResult.droppedBlocks
- if (enoughFreeSpace) {
- val entry = new Entry(value, size, deserialized)
- entries.synchronized {
- entries.put(blockId, entry)
- currentMemory += size
+ val toBeDroppedBlocks = putLock.synchronized {
ensureFreeSpace(blockId, size) }
+ if (toBeDroppedBlocks.isDefined) {
+ try {
+ toBeDroppedBlocks.get.foreach { block =>
+ val droppedBlockStatus = blockManager.dropFromMemory(block.id,
block.data)
+ droppedBlockStatus.map { status => droppedBlocks += blockId ->
status }
}
- if (deserialized) {
- logInfo("Block %s stored as values to memory (estimated size %s,
free %s)".format(
- blockId, Utils.bytesToString(size),
Utils.bytesToString(freeMemory)))
- } else {
- logInfo("Block %s stored as bytes to memory (size %s, free
%s)".format(
- blockId, Utils.bytesToString(size),
Utils.bytesToString(freeMemory)))
+ } catch {
+ // if task hit exception or cancelled by executor, then reset
dropping flags of selected blocks
+ case e: Exception => {
+ toBeDroppedBlocks.get.foreach { block =>
+ val entry = entries.synchronized{ entries.get(block.id) }
+ if (entry != null) entry.dropping = false
+ }
+ throw e
--- End diff --
Instead of catch Exception, rethrow; move it to a finally block - based on
some status variable.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---