Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11534#discussion_r55615052
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
    @@ -428,77 +458,46 @@ private[spark] class BlockManager(
           Option(
             
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
         } else {
    -      doGetLocal(blockId, asBlockResult = 
false).asInstanceOf[Option[ByteBuffer]]
    -    }
    -  }
    -
    -  private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): 
Option[Any] = {
    -    blockInfoManager.lockForReading(blockId) match {
    -      case None =>
    -        logDebug(s"Block $blockId was not found")
    -        None
    -      case Some(info) =>
    -        doGetLocal(blockId, info, asBlockResult)
    +      blockInfoManager.lockForReading(blockId) match {
    +        case None =>
    +          logDebug(s"Block $blockId was not found")
    +          None
    +        case Some(info) =>
    +          Some(doGetLocalBytes(blockId, info))
    +      }
         }
       }
     
       /**
    -   * Get a local block from the block manager.
    -   * Assumes that the caller holds a read lock on the block.
    +   * Get block from the local block manager as serialized bytes.
    +   *
    +   * Must be called while holding a read lock on the block.
    +   * Releases the read lock upon exception; keeps the read lock upon 
successful return.
        */
    -  private def doGetLocal(
    -      blockId: BlockId,
    -      info: BlockInfo,
    -      asBlockResult: Boolean): Option[Any] = {
    +  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): 
ByteBuffer = {
         val level = info.level
         logDebug(s"Level for block $blockId is $level")
    -
    -    // Look for the block in memory
    -    if (level.useMemory) {
    -      logDebug(s"Getting block $blockId from memory")
    -      val result = if (asBlockResult) {
    -        memoryStore.getValues(blockId).map { iter =>
    -          val ci = CompletionIterator[Any, Iterator[Any]](iter, 
releaseLock(blockId))
    -          new BlockResult(ci, DataReadMethod.Memory, info.size)
    -        }
    +    // In order, try to read the serialized bytes from memory, then from 
disk, then fall back to
    +    // serializing in-memory objects, and, finally, throw an exception if 
the block does not exist.
    +    if (level.deserialized) {
    +      // Try to avoid expensive serialization by reading a pre-serialized 
copy from disk:
    +      if (level.useDisk && diskStore.contains(blockId)) {
    +        diskStore.getBytes(blockId)
    --- End diff --
    
    I would add a comment here saying don't bother putting it back in memory 
here because we'd have to deserialize the data and that might be slow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to