junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r484521628



##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -228,29 +211,37 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
     // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
     // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    //
+    // ==============[story about lock]==============
+    // There is a potential deadlock in practice if we don't hold the lock 
while adding the operation to watch
+    // list and do the final tryComplete() check. For example,
+    // 1) thread_a holds readlock of stateLock from TransactionStateManager
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds op to watch list
+    // 4) thread_b requires writelock of stateLock from 
TransactionStateManager (blocked by thread_a)
+    // 5) thread_c holds lock of op (from watch list)
+    // 6) thread_c is waiting readlock of stateLock to complete op (blocked by 
thread_b)
+    // 7) thread_a is waiting lock of op to call safeTryComplete (blocked by 
thread_c)
+    //
+    // Noted that current approach can't prevent all deadlock. For example,
+    // 1) thread_a gets lock of op
+    // 2) thread_a adds op to watch list
+    // 3) thread_a calls op#tryComplete (and it requires lock_b)

Review comment:
       it requires lock_b => it tries to require lock_b

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,41 +102,22 @@ abstract class DelayedOperation(override val delayMs: 
Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
    *
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() and the expiration thread for certain delayed 
operations (e.g. DelayedJoin) pick up and then
+   * execute delayed actions when no lock is held.

Review comment:
       Since safeTryComplete() is no longer used in tryCompleteElseWatch(), the 
above comment is not completely relevant. Perhaps we could just explain what 
this method does "Thread-safe variant of tryComplete()."

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -228,29 +211,37 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
     // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
     // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    //
+    // ==============[story about lock]==============
+    // There is a potential deadlock in practice if we don't hold the lock 
while adding the operation to watch

Review comment:
       There is a potential deadlock => There is a potential deadlock between 
the callers to tryCompleteElseWatch() and checkAndComplete() 

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -228,29 +211,37 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
     // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
     // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    //
+    // ==============[story about lock]==============
+    // There is a potential deadlock in practice if we don't hold the lock 
while adding the operation to watch
+    // list and do the final tryComplete() check. For example,
+    // 1) thread_a holds readlock of stateLock from TransactionStateManager
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds op to watch list
+    // 4) thread_b requires writelock of stateLock from 
TransactionStateManager (blocked by thread_a)
+    // 5) thread_c holds lock of op (from watch list)

Review comment:
       thread_c holds lock of op => thread_c calls checkAndComplete () and 
holds lock of op 

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -228,29 +211,37 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
     // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
     // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    //
+    // ==============[story about lock]==============
+    // There is a potential deadlock in practice if we don't hold the lock 
while adding the operation to watch
+    // list and do the final tryComplete() check. For example,
+    // 1) thread_a holds readlock of stateLock from TransactionStateManager
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds op to watch list
+    // 4) thread_b requires writelock of stateLock from 
TransactionStateManager (blocked by thread_a)
+    // 5) thread_c holds lock of op (from watch list)
+    // 6) thread_c is waiting readlock of stateLock to complete op (blocked by 
thread_b)
+    // 7) thread_a is waiting lock of op to call safeTryComplete (blocked by 
thread_c)
+    //
+    // Noted that current approach can't prevent all deadlock. For example,
+    // 1) thread_a gets lock of op
+    // 2) thread_a adds op to watch list
+    // 3) thread_a calls op#tryComplete (and it requires lock_b)
+    // 4) thread_b holds lock_b
+    // 5) thread_b sees op from watch list
+    // 6) thread_b needs lock of op
+    // The above story produces a deadlock but it is not an issue in 
production yet since there is no reason for the

Review comment:
       Perhaps we could change this comment to sth like the following.
   
   To avoid the above scenario, we recommend 
DelayedOperationPurgatory.checkAndComplete() be called without holding any 
lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed 
operations asynchronously, holding a lock to make the call is often unnecessary.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##########
@@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
         }
       }
       val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-      watchKeys ++= producerRequestKeys
       producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+      watchKeys ++= producerRequestKeys

Review comment:
       Is this change still necessary now that we always call tryComplete() 
with lock in tryCompleteElseWatch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to