sarutak opened a new pull request, #52524:
URL: https://github.com/apache/spark/pull/52524

   ### What changes were proposed in this pull request?
   This PR fixes race condition issues between `unlock` and 
`releaseAllLocksForTask` in `BlockManager`.
   
   In case read locks for a block acquired by a task are released by `unclck` 
and `releaseAllLocksForTask` concurrently, assertion error can happen.
   The reason is calling `entry.getCount` in `releaseAllLocksForTask` can 
return an old value even after the count in an entry is decreased by 
`countsForTask.remove` on another thread. So `info.readerCount -= lockCount` 
can result in a negative number, causing assertion error.
   
   This issue can be reproduced by inserting sleep into `unlock` and 
`releaseAllLocksForTask` like as follows.
   
   * unlock
   ```
      // reader counts. We need to check if the readLocksByTask per tasks are 
present, if they
      // are not then we know releaseAllLocksForTask has already cleaned up the 
read lock.
      val countsForTask = readLocksByTask.get(taskAttemptId)
   +  Thread.sleep(5)
      if (countsForTask != null) {
        assert(info.readerCount > 0, s"Block $blockId is not locked for 
reading")
        info.readerCount -= 1
   ```
   
   * releaseAllLocksForTask
   ```
   +  Thread.sleep(5)
      val readLocks = Option(readLocksByTask.remove(taskAttemptId))
        .getOrElse(ImmutableMultiset.of[BlockId])
      readLocks.entrySet().forEach { entry =>
        val blockId = entry.getElement
        val lockCount = entry.getCount
   +    Thread.sleep(5)
        blocksWithReleasedLocks += blockId
   ```
   
   The Javadoc for 
[ConcurrentHashMultiset#entrySet](https://guava.dev/releases/33.4.0-jre/api/docs/com/google/common/collect/ConcurrentHashMultiset.html)
 says as follows.
   
   ```
   However, multiset changes may or may not be reflected in any Entry instances 
already retrieved from the entry set (this is implementation-dependent)
   ```
   
   So, this PR calculates `lockCount` by calling `readLocks.count` to get the 
latest count, and place it within `blockInfo` block for exclusive execution.
   
   Similar to read locks, a race condition isssue can happen even for write 
locks.
   During `writeLocks.forEach` in `releaseAllLocksForTask`, a `blockId` can be 
removed from `writeLocks` by 
`writeLocksByTask.get(taskAttemptId).remove(blockId)` in `unlock` on another 
thread.
   You can reproduce this issue by the new test added in this PR.
   This PR fixes this issue by checking the existence of a `blockId` by 
`writeLocks.contains(info)` within `blockInfo` block.
   
   
   ### Why are the changes needed?
   Bug fix.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Confirmed `SPARK-38675 - concurrent unlock and releaseAllLocksForTask calls 
should not fail` passes even if sleeps are inserted into `unlock` and 
`releaseAllLocksForTask` like as follows.
   
   * unlock
   ```
    val countsForTask = readLocksByTask.get(taskAttemptId)
    if (countsForTask != null) {
   +  Thread.sleep(5)
      assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
      info.readerCount -= 1
      val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
   ```
   
   * releaseAllLocksForTask
   ```
   +  Thread.sleep(5)
      val readLocks = Option(readLocksByTask.remove(taskAttemptId))
        .getOrElse(ImmutableMultiset.of[BlockId])
      readLocks.entrySet().forEach { entry =>
        val blockId = entry.getElement
   ```
   ```
     // Using readLocks.count instead of entry.getCount is intentional. See 
discussion in
     // SPARK-50771.
     val lockCount = readLocks.count(blockId)
   + Thread.sleep(5)
   
     // lockCount can be 0 if read locks for `blockId` are released in `unlock` 
concurrently.
     if (lockCount > 0) {
       blocksWithReleasedLocks += blockId
       info.readerCount -= lockCount
   ```
   
   Also new test for write locks is added.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to