Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5908#discussion_r30576550
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala ---
    @@ -62,42 +62,72 @@ private[spark] class ExternalBlockStore(blockManager: 
BlockManager, executorId:
           values: Iterator[Any],
           level: StorageLevel,
           returnValues: Boolean): PutResult = {
    -    logDebug(s"Attempting to write values for block $blockId")
    -    val bytes = blockManager.dataSerialize(blockId, values)
    -    putIntoExternalBlockStore(blockId, bytes, returnValues)
    +    putIntoExternalBlockStore(blockId, values, returnValues)
       }
     
       private def putIntoExternalBlockStore(
           blockId: BlockId,
    -      bytes: ByteBuffer,
    +      values: Iterator[_],
           returnValues: Boolean): PutResult = {
    -    // So that we do not modify the input offsets !
    -    // duplicate does not copy buffer, so inexpensive
    -    val byteBuffer = bytes.duplicate()
    -    byteBuffer.rewind()
    -    logDebug(s"Attempting to put block $blockId into ExtBlk store")
    +    logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
         // we should never hit here if externalBlockManager is None. Handle it 
anyway for safety.
         try {
           val startTime = System.currentTimeMillis
           if (externalBlockManager.isDefined) {
    -        externalBlockManager.get.putBytes(blockId, bytes)
    +        externalBlockManager.get.putValues(blockId, values)
    +        val size = getSize(blockId)
    +        val data = if (returnValues) {
    +          Left(getValues(blockId).get)
    +        } else {
    +          null
    +        }
             val finishTime = System.currentTimeMillis
             logDebug("Block %s stored as %s file in ExternalBlockStore in %d 
ms".format(
    -          blockId, Utils.bytesToString(byteBuffer.limit), finishTime - 
startTime))
    +          blockId, Utils.bytesToString(size), finishTime - startTime))
    +        PutResult(size, data)
    +      } else {
    +        logError(s"Error in putValues $blockId : no ExternalBlockManager 
exists!")
    +        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
    +      }
    +    } catch {
    +      case NonFatal(t) =>
    +        logError(s"Error in putValues $blockId", t)
    +        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
    +    }
    +  }
     
    -        if (returnValues) {
    -          PutResult(bytes.limit(), Right(bytes.duplicate()))
    +  private def putIntoExternalBlockStore(
    +      blockId: BlockId,
    +      bytes: ByteBuffer,
    +      returnValues: Boolean): PutResult = {
    +    logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
    +    // we should never hit here if externalBlockManager is None. Handle it 
anyway for safety.
    +    try {
    +      val startTime = System.currentTimeMillis
    +      if (externalBlockManager.isDefined) {
    +        // So that we do not modify the input offsets !
    +        // duplicate does not copy buffer, so inexpensive
    +        val byteBuffer = bytes.duplicate()
    +        byteBuffer.rewind()
    +        externalBlockManager.get.putBytes(blockId, byteBuffer)
    +        val size = bytes.limit()
    +        val data = if (returnValues) {
    +          Right(bytes)
             } else {
    -          PutResult(bytes.limit(), null)
    +          null
             }
    +        val finishTime = System.currentTimeMillis
    +        logDebug("Block %s stored as %s file in ExternalBlockStore in %d 
ms".format(
    +          blockId, Utils.bytesToString(size), finishTime - startTime))
    +        PutResult(size, data)
           } else {
    -        logError(s"error in putBytes $blockId")
    -        PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
    +        logError(s"Error in putBytes $blockId : no ExternalBlockManager 
exists!")
    --- End diff --
    
    ```
    s"Error in putBytes($blockId): no ExternalBlockManager has been configured."
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to