Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11436#discussion_r54504911
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala ---
    @@ -307,29 +307,48 @@ private[storage] class BlockInfoManager extends 
Logging {
     
       /**
        * Atomically create metadata for a block and acquire a write lock for 
it, if it doesn't already
    -   * exist.
    +   * exist. If the block already exists, then this returns a read lock on 
that block.
    +   *
    +   * If another task already holds a write lock on this block, then this 
method will block until
    +   * that other thread releases or downgrades the lock.
        *
        * @param blockId the block id.
        * @param newBlockInfo the block info for the new block.
        * @return true if the block did not already exist, false otherwise. If 
this returns false, then
    -   *         no new locks are acquired. If this returns true, a write lock 
on the new block will
    -   *         be held.
    +   *         a read lock on the existing block will be held. If this 
returns true, a write lock on
    +   *         the new block will be held.
        */
       def lockNewBlockForWriting(
           blockId: BlockId,
           newBlockInfo: BlockInfo): Boolean = synchronized {
         logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
    -    if (!infos.contains(blockId)) {
    -      infos(blockId) = newBlockInfo
    -      newBlockInfo.writerTask = currentTaskAttemptId
    -      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
    -      logTrace(s"Task $currentTaskAttemptId successfully locked new block 
$blockId")
    -      true
    -    } else {
    -      logTrace(s"Task $currentTaskAttemptId did not create and lock block 
$blockId " +
    -        s"because that block already exists")
    -      false
    +    while(true) {
    +      if (!infos.contains(blockId)) {
    +        // If there is no blockInfo for the given block, then the block 
does not exist and the
    +        // caller won the race to write this block, so create a new 
blockInfo and lock the block
    +        // for writing.
    +        infos(blockId) = newBlockInfo
    +        newBlockInfo.writerTask = currentTaskAttemptId
    +        writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
    +        logTrace(s"Task $currentTaskAttemptId successfully locked new 
block $blockId")
    +        return true
    +      } else {
    +        // Otherwise, the block already exists. If the block is locked for 
writing, however, it's
    --- End diff --
    
    The second sentence in this comment is hard to parse.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to