Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11436#discussion_r54658561
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala ---
    @@ -307,29 +307,48 @@ private[storage] class BlockInfoManager extends 
Logging {
     
       /**
        * Atomically create metadata for a block and acquire a write lock for 
it, if it doesn't already
    -   * exist.
    +   * exist. If the block already exists, then this returns a read lock on 
that block.
    +   *
    +   * If another task already holds a write lock on this block, then this 
method will block until
    +   * that other thread releases or downgrades the lock.
        *
        * @param blockId the block id.
        * @param newBlockInfo the block info for the new block.
        * @return true if the block did not already exist, false otherwise. If 
this returns false, then
    -   *         no new locks are acquired. If this returns true, a write lock 
on the new block will
    -   *         be held.
    +   *         a read lock on the existing block will be held. If this 
returns true, a write lock on
    +   *         the new block will be held.
        */
       def lockNewBlockForWriting(
           blockId: BlockId,
           newBlockInfo: BlockInfo): Boolean = synchronized {
         logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
    -    if (!infos.contains(blockId)) {
    -      infos(blockId) = newBlockInfo
    -      newBlockInfo.writerTask = currentTaskAttemptId
    -      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
    -      logTrace(s"Task $currentTaskAttemptId successfully locked new block 
$blockId")
    -      true
    -    } else {
    -      logTrace(s"Task $currentTaskAttemptId did not create and lock block 
$blockId " +
    -        s"because that block already exists")
    -      false
    +    while(true) {
    +      if (!infos.contains(blockId)) {
    +        // If there is no blockInfo for the given block, then the block 
does not exist and the
    +        // caller won the race to write this block, so create a new 
blockInfo and lock the block
    +        // for writing.
    +        infos(blockId) = newBlockInfo
    +        newBlockInfo.writerTask = currentTaskAttemptId
    +        writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
    +        logTrace(s"Task $currentTaskAttemptId successfully locked new 
block $blockId")
    +        return true
    +      } else {
    +        // Otherwise, the block already exists. If the block is locked for 
writing, however, it's
    +        // possible that the writer might delete the block or might be in 
the process of writing the
    +        // block (which could fail); in this case, we choose to block 
until that write lock is
    +        // released so we know whether the block is available for reading 
before returning. This
    +        // semantic is useful in the implementation of the BlockManager's 
getOrElseUpdate() method.
    +        if (lockForReading(blockId).isDefined) {
    --- End diff --
    
    Andrew and I discussed this offline. The while loop here is actually 
necessary in the current code due to the way that lockForReading behaves when 
the block is removed by the writer, but I can make some changes in 
`lockForReading` itself which will allow me to simplify this. Stay tuned for an 
update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to