junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r485026583
########## File path: core/src/main/scala/kafka/server/DelayedOperation.scala ########## @@ -219,38 +203,38 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { assert(watchKeys.nonEmpty, "The watch key list can't be empty") - // The cost of tryComplete() is typically proportional to the number of keys. Calling - // tryComplete() for each key is going to be expensive if there are many keys. Instead, - // we do the check in the following way. Call tryComplete(). If the operation is not completed, - // we just add the operation to all keys. Then we call tryComplete() again. At this time, if - // the operation is still not completed, we are guaranteed that it won't miss any future triggering - // event since the operation is already on the watcher list for all keys. This does mean that - // if the operation is completed (by another thread) between the two tryComplete() calls, the - // operation is unnecessarily added for watch. However, this is a less severe issue since the - // expire reaper will clean it up periodically. - - // At this point the only thread that can attempt this operation is this current thread - // Hence it is safe to tryComplete() without a lock - var isCompletedByMe = operation.tryComplete() - if (isCompletedByMe) - return true - - var watchCreated = false - for(key <- watchKeys) { - // If the operation is already completed, stop adding it to the rest of the watcher list. - if (operation.isCompleted) - return false - watchForOperation(key, operation) - - if (!watchCreated) { - watchCreated = true - estimatedTotalOperations.incrementAndGet() - } - } - - isCompletedByMe = operation.maybeTryComplete() - if (isCompletedByMe) - return true + // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is + // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse(). + // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At + // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering + // event since the operation is already on the watcher list for all keys. + // + // ==============[story about lock]============== + // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing + // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and + // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete() + // 1) thread_a holds readlock of stateLock from TransactionStateManager + // 2) thread_a is executing tryCompleteElseWatch + // 3) thread_a adds op to watch list + // 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a) + // 5) thread_c calls checkAndComplete() and holds lock of op + // 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b) + // 7) thread_a is waiting lock of op to call safeTryComplete (blocked by thread_c) Review comment: to call safeTryComplete => to call the final tryComplete() ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ########## @@ -119,12 +110,33 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest @Test def testConcurrentTxnGoodPathSequence(): Unit = { - verifyConcurrentOperations(createGroupMembers, allOperationsWithTxn) + verifyConcurrentOperations(createGroupMembers, Seq( + new JoinGroupOperation, + new SyncGroupOperation, + new OffsetFetchOperation, + new CommitTxnOffsetsOperation, + new CompleteTxnOperation, + new HeartbeatOperation, + new LeaveGroupOperation + )) } @Test def testConcurrentRandomSequence(): Unit = { - verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn) + /** + * handleTxnCommitOffsets does not complete delayed requests now so it causes error if handleTxnCompletion is executed Review comment: causes error => causes an error ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ########## @@ -119,12 +110,33 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest @Test def testConcurrentTxnGoodPathSequence(): Unit = { - verifyConcurrentOperations(createGroupMembers, allOperationsWithTxn) + verifyConcurrentOperations(createGroupMembers, Seq( + new JoinGroupOperation, + new SyncGroupOperation, + new OffsetFetchOperation, + new CommitTxnOffsetsOperation, + new CompleteTxnOperation, + new HeartbeatOperation, + new LeaveGroupOperation + )) } @Test def testConcurrentRandomSequence(): Unit = { - verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn) + /** + * handleTxnCommitOffsets does not complete delayed requests now so it causes error if handleTxnCompletion is executed + * before completing delayed request. In random mode, we use this global lock to prevent such error. Review comment: such error => such an error ########## File path: core/src/main/scala/kafka/server/DelayedOperation.scala ########## @@ -219,38 +203,38 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { assert(watchKeys.nonEmpty, "The watch key list can't be empty") - // The cost of tryComplete() is typically proportional to the number of keys. Calling - // tryComplete() for each key is going to be expensive if there are many keys. Instead, - // we do the check in the following way. Call tryComplete(). If the operation is not completed, - // we just add the operation to all keys. Then we call tryComplete() again. At this time, if - // the operation is still not completed, we are guaranteed that it won't miss any future triggering - // event since the operation is already on the watcher list for all keys. This does mean that - // if the operation is completed (by another thread) between the two tryComplete() calls, the - // operation is unnecessarily added for watch. However, this is a less severe issue since the - // expire reaper will clean it up periodically. - - // At this point the only thread that can attempt this operation is this current thread - // Hence it is safe to tryComplete() without a lock - var isCompletedByMe = operation.tryComplete() - if (isCompletedByMe) - return true - - var watchCreated = false - for(key <- watchKeys) { - // If the operation is already completed, stop adding it to the rest of the watcher list. - if (operation.isCompleted) - return false - watchForOperation(key, operation) - - if (!watchCreated) { - watchCreated = true - estimatedTotalOperations.incrementAndGet() - } - } - - isCompletedByMe = operation.maybeTryComplete() - if (isCompletedByMe) - return true + // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is + // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse(). + // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At + // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering + // event since the operation is already on the watcher list for all keys. + // + // ==============[story about lock]============== + // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing + // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and + // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete() + // 1) thread_a holds readlock of stateLock from TransactionStateManager + // 2) thread_a is executing tryCompleteElseWatch Review comment: tryCompleteElseWatch => tryCompleteElseWatch() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org