Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11436#discussion_r54627020
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala ---
    @@ -307,29 +307,48 @@ private[storage] class BlockInfoManager extends 
Logging {
     
       /**
        * Atomically create metadata for a block and acquire a write lock for 
it, if it doesn't already
    -   * exist.
    +   * exist. If the block already exists, then this returns a read lock on 
that block.
    +   *
    +   * If another task already holds a write lock on this block, then this 
method will block until
    +   * that other thread releases or downgrades the lock.
        *
        * @param blockId the block id.
        * @param newBlockInfo the block info for the new block.
        * @return true if the block did not already exist, false otherwise. If 
this returns false, then
    -   *         no new locks are acquired. If this returns true, a write lock 
on the new block will
    -   *         be held.
    +   *         a read lock on the existing block will be held. If this 
returns true, a write lock on
    +   *         the new block will be held.
        */
       def lockNewBlockForWriting(
           blockId: BlockId,
           newBlockInfo: BlockInfo): Boolean = synchronized {
         logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
    -    if (!infos.contains(blockId)) {
    -      infos(blockId) = newBlockInfo
    -      newBlockInfo.writerTask = currentTaskAttemptId
    -      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
    -      logTrace(s"Task $currentTaskAttemptId successfully locked new block 
$blockId")
    -      true
    -    } else {
    -      logTrace(s"Task $currentTaskAttemptId did not create and lock block 
$blockId " +
    -        s"because that block already exists")
    -      false
    +    while(true) {
    +      if (!infos.contains(blockId)) {
    +        // If there is no blockInfo for the given block, then the block 
does not exist and the
    +        // caller won the race to write this block, so create a new 
blockInfo and lock the block
    +        // for writing.
    +        infos(blockId) = newBlockInfo
    +        newBlockInfo.writerTask = currentTaskAttemptId
    +        writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
    +        logTrace(s"Task $currentTaskAttemptId successfully locked new 
block $blockId")
    +        return true
    +      } else {
    +        // Otherwise, the block already exists. If the block is locked for 
writing, however, it's
    +        // possible that the writer might delete the block or might be in 
the process of writing the
    +        // block (which could fail); in this case, we choose to block 
until that write lock is
    +        // released so we know whether the block is available for reading 
before returning. This
    +        // semantic is useful in the implementation of the BlockManager's 
getOrElseUpdate() method.
    +        if (lockForReading(blockId).isDefined) {
    --- End diff --
    
    This works, but I think there's a cleaner way to do the same thing without 
the while loop:
    ```
    /**
     * Attempt to acquire the appropriate lock for writing a new block.
     *
     * This enforces the first-writer-wins semantics. If we are the first to 
write the block,
     * then just go ahead and acquire the write lock. Otherwise, if another 
thread is already
     * writing the block, then we wait for the write to finish before acquiring 
the read lock.
     *
     * [... explain return value here ...]
     */
    def lockNewBlockForWriting(blockId: BlockId, blockInfo: BlockInfo): Boolean 
= synchronized {
      lockForReading(blockId) match {
        case Some(binfo) =>
          // Block already exists. This could happen if another thread races 
with us to compute
          // the same block. In this case, just keep the read lock and return.
          false
        case None =>
          // Block does not yet exist or is removed, so we are free to acquire 
the write lock
          infos(blockId) = blockInfo
          lockForWriting(blockId)
          true
      }
    }
    ```
    To me this is much easier to explain than what you currently have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to