mridulm commented on a change in pull request #34632:
URL: https://github.com/apache/spark/pull/34632#discussion_r752505963
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
##########
@@ -319,18 +376,35 @@ private[storage] class BlockInfoManager extends Logging {
*/
def lockNewBlockForWriting(
blockId: BlockId,
- newBlockInfo: BlockInfo): Boolean = synchronized {
+ newBlockInfo: BlockInfo): Boolean = {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
- lockForReading(blockId) match {
- case Some(info) =>
- // Block already exists. This could happen if another thread races
with us to compute
- // the same block. In this case, just keep the read lock and return.
- false
- case None =>
- // Block does not yet exist or is removed, so we are free to acquire
the write lock
- infos(blockId) = newBlockInfo
- lockForWriting(blockId)
- true
+ // Get the lock that will be associated with the to-be written block and
lock it for the entire
+ // duration of this operation. This way we prevent race conditions when
two threads try to write
+ // the same block at the same time.
+ val lock = locks.get(blockId)
+ lock.lock()
+ try {
+ val wrapper = new BlockInfoWrapper(newBlockInfo, lock)
+ while (true) {
+ val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+ if (previous == null) {
+ // New block lock it for writing.
+ val result = lockForWriting(blockId, blocking = false)
+ assert(result.isDefined)
Review comment:
Sounds good, I wanted to make sure I understood the behavior now.
I am fine with leaving it as is - the assertion failure, if it happens, will
indicate if this assumption is getting violated anyway !.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]