mridulm commented on a change in pull request #34632:
URL: https://github.com/apache/spark/pull/34632#discussion_r752505963



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
##########
@@ -319,18 +376,35 @@ private[storage] class BlockInfoManager extends Logging {
    */
   def lockNewBlockForWriting(
       blockId: BlockId,
-      newBlockInfo: BlockInfo): Boolean = synchronized {
+      newBlockInfo: BlockInfo): Boolean = {
     logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
-    lockForReading(blockId) match {
-      case Some(info) =>
-        // Block already exists. This could happen if another thread races 
with us to compute
-        // the same block. In this case, just keep the read lock and return.
-        false
-      case None =>
-        // Block does not yet exist or is removed, so we are free to acquire 
the write lock
-        infos(blockId) = newBlockInfo
-        lockForWriting(blockId)
-        true
+    // Get the lock that will be associated with the to-be written block and 
lock it for the entire
+    // duration of this operation. This way we prevent race conditions when 
two threads try to write
+    // the same block at the same time.
+    val lock = locks.get(blockId)
+    lock.lock()
+    try {
+      val wrapper = new BlockInfoWrapper(newBlockInfo, lock)
+      while (true) {
+        val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+        if (previous == null) {
+          // New block lock it for writing.
+          val result = lockForWriting(blockId, blocking = false)
+          assert(result.isDefined)

Review comment:
       Sounds good, I wanted to make sure I understood the behavior now.
   I am fine with leaving it as is - the assertion failure, if it happens, will 
indicate if this assumption is getting violated anyway !.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to