Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/11436#discussion_r54627020
--- Diff:
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala ---
@@ -307,29 +307,48 @@ private[storage] class BlockInfoManager extends
Logging {
/**
* Atomically create metadata for a block and acquire a write lock for
it, if it doesn't already
- * exist.
+ * exist. If the block already exists, then this returns a read lock on
that block.
+ *
+ * If another task already holds a write lock on this block, then this
method will block until
+ * that other thread releases or downgrades the lock.
*
* @param blockId the block id.
* @param newBlockInfo the block info for the new block.
* @return true if the block did not already exist, false otherwise. If
this returns false, then
- * no new locks are acquired. If this returns true, a write lock
on the new block will
- * be held.
+ * a read lock on the existing block will be held. If this
returns true, a write lock on
+ * the new block will be held.
*/
def lockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo): Boolean = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
- if (!infos.contains(blockId)) {
- infos(blockId) = newBlockInfo
- newBlockInfo.writerTask = currentTaskAttemptId
- writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
- logTrace(s"Task $currentTaskAttemptId successfully locked new block
$blockId")
- true
- } else {
- logTrace(s"Task $currentTaskAttemptId did not create and lock block
$blockId " +
- s"because that block already exists")
- false
+ while(true) {
+ if (!infos.contains(blockId)) {
+ // If there is no blockInfo for the given block, then the block
does not exist and the
+ // caller won the race to write this block, so create a new
blockInfo and lock the block
+ // for writing.
+ infos(blockId) = newBlockInfo
+ newBlockInfo.writerTask = currentTaskAttemptId
+ writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+ logTrace(s"Task $currentTaskAttemptId successfully locked new
block $blockId")
+ return true
+ } else {
+ // Otherwise, the block already exists. If the block is locked for
writing, however, it's
+ // possible that the writer might delete the block or might be in
the process of writing the
+ // block (which could fail); in this case, we choose to block
until that write lock is
+ // released so we know whether the block is available for reading
before returning. This
+ // semantic is useful in the implementation of the BlockManager's
getOrElseUpdate() method.
+ if (lockForReading(blockId).isDefined) {
--- End diff --
This works, but I think there's a cleaner way to do the same thing without
the while loop:
```
/**
* Attempt to acquire the appropriate lock for writing a new block.
*
* This enforces the first-writer-wins semantics. If we are the first to
write the block,
* then just go ahead and acquire the write lock. Otherwise, if another
thread is already
* writing the block, then we wait for the write to finish before acquiring
the read lock.
*
* [... explain return value here ...]
*/
def lockNewBlockForWriting(blockId: BlockId, blockInfo: BlockInfo): Boolean
= synchronized {
lockForReading(blockId) match {
case Some(binfo) =>
// Block already exists. This could happen if another thread races
with us to compute
// the same block. In this case, just keep the read lock and return.
false
case None =>
// Block does not yet exist or is removed, so we are free to acquire
the write lock
infos(blockId) = blockInfo
lockForWriting(blockId)
true
}
}
```
To me this is much easier to explain than what you currently have.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]