hvanhovell commented on a change in pull request #34632:
URL: https://github.com/apache/spark/pull/34632#discussion_r752140190



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
##########
@@ -319,18 +376,35 @@ private[storage] class BlockInfoManager extends Logging {
    */
   def lockNewBlockForWriting(
       blockId: BlockId,
-      newBlockInfo: BlockInfo): Boolean = synchronized {
+      newBlockInfo: BlockInfo): Boolean = {
     logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
-    lockForReading(blockId) match {
-      case Some(info) =>
-        // Block already exists. This could happen if another thread races 
with us to compute
-        // the same block. In this case, just keep the read lock and return.
-        false
-      case None =>
-        // Block does not yet exist or is removed, so we are free to acquire 
the write lock
-        infos(blockId) = newBlockInfo
-        lockForWriting(blockId)
-        true
+    // Get the lock that will be associated with the to-be written block and 
lock it for the entire
+    // duration of this operation. This way we prevent race conditions when 
two threads try to write
+    // the same block at the same time.
+    val lock = locks.get(blockId)
+    lock.lock()
+    try {
+      val wrapper = new BlockInfoWrapper(newBlockInfo, lock)
+      while (true) {
+        val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+        if (previous == null) {
+          // New block lock it for writing.
+          val result = lockForWriting(blockId, blocking = false)
+          assert(result.isDefined)

Review comment:
       So my reasoning is this. You need to hold the lock for a block when we 
try to add it to the map. Nothing else can modify the block in the meantime. If 
the value returned is null it means it is a new block, and this means we should 
be able to acquire the lock without any issue (this also the reason why we use 
`blocking=false`).
   
   I don't mind being defensive here. I can add a retry if you think we should.
   
   TBH this is one of the more subtle changes in this PR. One of the problems 
is that the caller relies on the fact that the `BlockInfo` object passed in is 
also the one persisted in the BlockInfoManager if you acquire the write lock. 
If you would lock the block after putIfAbsent you have to deal with the 
situation where another thread might 'steal' the lock, and take out a read lock 
on an uninitialized block (or a few other weird scenarios). While there are 
probably ways to deal with that, I opted to use a simpler approach where this 
is not a problem.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to