Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11534#discussion_r55735070
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
    @@ -739,46 +696,105 @@ private[spark] class BlockManager(
           blockId: BlockId,
           bytes: ByteBuffer,
           level: StorageLevel,
    -      tellMaster: Boolean = true,
    -      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
    +      tellMaster: Boolean = true): Boolean = {
         require(bytes != null, "Bytes is null")
    -    val result = doPut(blockId, ByteBufferValues(bytes), level, 
tellMaster, effectiveStorageLevel)
    -    result == DoPutSucceeded
    +    doPutBytes(blockId, bytes, level, tellMaster)
       }
     
       /**
    -   * Put the given block according to the given level in one of the block 
stores, replicating
    +   * Put the given bytes according to the given level in one of the block 
stores, replicating
        * the values if necessary.
        *
        * If the block already exists, this method will not overwrite it.
        *
    -   * @param effectiveStorageLevel the level according to which the block 
will actually be handled.
    -   *                              This allows the caller to specify an 
alternate behavior of doPut
    -   *                              while preserving the original level 
specified by the user.
        * @param keepReadLock if true, this method will hold the read lock when 
it returns (even if the
        *                     block already exists). If false, this method will 
hold no locks when it
        *                     returns.
    -   * @return [[DoPutSucceeded]] if the block was already present or if the 
put succeeded, or
    -   *        [[DoPutBytesFailed]] if the put failed and we were storing 
bytes, or
    -   *        [[DoPutIteratorFailed]] if the put failed and we were storing 
an iterator.
    +   * @return true if the block was already present or if the put 
succeeded, false otherwise.
        */
    -  private def doPut(
    +  private def doPutBytes(
           blockId: BlockId,
    -      data: BlockValues,
    +      bytes: ByteBuffer,
           level: StorageLevel,
           tellMaster: Boolean = true,
    -      effectiveStorageLevel: Option[StorageLevel] = None,
    -      keepReadLock: Boolean = false): DoPutResult = {
    +      keepReadLock: Boolean = false): Boolean = {
    +    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = 
keepReadLock) { putBlockInfo =>
    +      val startTimeMs = System.currentTimeMillis
    +      // Since we're storing bytes, initiate the replication before 
storing them locally.
    +      // This is faster as data is already serialized and ready to send.
    +      val replicationFuture = if (level.replication > 1) {
    +        // Duplicate doesn't copy the bytes, but just creates a wrapper
    +        val bufferView = bytes.duplicate()
    +        Future {
    +          // This is a blocking action and should run in 
futureExecutionContext which is a cached
    +          // thread pool
    +          replicate(blockId, bufferView, level)
    +        }(futureExecutionContext)
    +      } else {
    +        null
    +      }
    +
    +      bytes.rewind()
    +      val size = bytes.limit()
    +
    +      if (level.useMemory) {
    +        // Put it in memory first, even if it also has useDisk set to true;
    +        // We will drop it to disk later if the memory store can't hold it.
    +        val putSucceeded = if (level.deserialized) {
    +          val values = dataDeserialize(blockId, bytes.duplicate())
    +          memoryStore.putIterator(blockId, values, level).isRight
    +        } else {
    +          memoryStore.putBytes(blockId, size, () => bytes)
    +        }
    +        if (!putSucceeded && level.useDisk) {
    +          logWarning(s"Persisting block $blockId to disk instead.")
    +          diskStore.putBytes(blockId, bytes)
    +        }
    +      } else if (level.useDisk) {
    +        diskStore.putBytes(blockId, bytes)
    +      }
    +
    +      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
    +      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
    +      if (blockWasSuccessfullyStored) {
    +        // Now that the block is in either the memory, externalBlockStore, 
or disk store,
    +        // tell the master about it.
    +        putBlockInfo.size = size
    +        if (tellMaster) {
    +          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
    +        }
    +        Option(TaskContext.get()).foreach { c =>
    +          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
    +        }
    +      }
    +      logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
    +      if (level.replication > 1) {
    +        // Wait for asynchronous replication to finish
    +        Await.ready(replicationFuture, Duration.Inf)
    +      }
    +      if (blockWasSuccessfullyStored) {
    +        None
    +      } else {
    +        Some(bytes)
    --- End diff --
    
    I don't think we ever use this anywhere. It's simpler to just return 
`blockWasSuccessfullyStored` here so this method can just return the result of 
`doPut` directly instead of `doPut(...) { ... }.isEmpty`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to