Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/791#discussion_r12841208
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
    @@ -166,45 +166,51 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
           size: Long,
           deserialized: Boolean): ResultWithDroppedBlocks = {
     
    -    /* TODO: Its possible to optimize the locking by locking entries only 
when selecting blocks
    -     * to be dropped. Once the to-be-dropped blocks have been selected, 
and lock on entries has
    -     * been released, it must be ensured that those to-be-dropped blocks 
are not double counted
    -     * for freeing up more space for another block that needs to be put. 
Only then the actually
    -     * dropping of blocks (and writing to disk if necessary) can proceed 
in parallel. */
    -
         var putSuccess = false
         val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
     
    -    putLock.synchronized {
    -      val freeSpaceResult = ensureFreeSpace(blockId, size)
    -      val enoughFreeSpace = freeSpaceResult.success
    -      droppedBlocks ++= freeSpaceResult.droppedBlocks
     
    -      if (enoughFreeSpace) {
    -        val entry = new Entry(value, size, deserialized)
    -        entries.synchronized {
    -          entries.put(blockId, entry)
    -          currentMemory += size
    +    val toBeDroppedBlocks = putLock.synchronized { 
ensureFreeSpace(blockId, size) }
    +    if (toBeDroppedBlocks.isDefined) {
    +      try {
    +        toBeDroppedBlocks.get.foreach { block =>
    +          val droppedBlockStatus = blockManager.dropFromMemory(block.id, 
block.data)
    +          droppedBlockStatus.map { status => droppedBlocks += blockId -> 
status }
             }
    -        if (deserialized) {
    -          logInfo("Block %s stored as values to memory (estimated size %s, 
free %s)".format(
    -            blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
    -        } else {
    -          logInfo("Block %s stored as bytes to memory (size %s, free 
%s)".format(
    -            blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
    +      } catch {
    +        // if task hit exception or cancelled by executor, then reset 
dropping flags of selected blocks
    +        case e: Exception => {
    +          toBeDroppedBlocks.get.foreach { block =>
    +            val entry = entries.synchronized{ entries.get(block.id) }
    +            if (entry != null) entry.dropping = false
    +          }
    +          throw e
             }
    -        putSuccess = true
    +      }
    +
    +      val entry = new Entry(value, size, deserialized)
    +      entries.synchronized {
    +        entries.put(blockId, entry)
    +        currentMemory += size
    +      }
    +      if (deserialized) {
    +        logInfo("Block %s stored as values to memory (estimated size %s, 
free %s)".format(
    +          blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
           } else {
    -        // Tell the block manager that we couldn't put it in memory so 
that it can drop it to
    -        // disk if the block allows disk storage.
    -        val data = if (deserialized) {
    -          Left(value.asInstanceOf[ArrayBuffer[Any]])
    -        } else {
    -          Right(value.asInstanceOf[ByteBuffer].duplicate())
    -        }
    -        val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
    -        droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, 
status)) }
    +        logInfo("Block %s stored as bytes to memory (size %s, free 
%s)".format(
    +          blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
           }
    +      putSuccess = true
    +    } else {
    +      // Tell the block manager that we couldn't put it in memory so that 
it can drop it to
    +      // disk if the block allows disk storage.
    +      val data = if (deserialized) {
    +        Left(value.asInstanceOf[ArrayBuffer[Any]])
    +      } else {
    +        Right(value.asInstanceOf[ByteBuffer].duplicate())
    +      }
    +      val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
    +      droppedBlockStatus.map { status => droppedBlocks += blockId -> 
status }
    --- End diff --
    
    This is an incorrect change of behavior from earlier code due to change in 
MT semantics - this is not happening within putLock : and so can cause 
incorrect results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to