junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479383807



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -85,6 +85,8 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHWIncremented true if the high watermark is increased when 
appending record. Otherwise, false.
+ *                            this field is updated after appending record so 
it has default value option.

Review comment:
       The default value is None.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -36,8 +36,14 @@ private[group] class DelayedJoin(coordinator: 
GroupCoordinator,
                                  rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
 
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  override def onExpiration(): Unit = {
+    coordinator.onExpireJoin()
+    // try to complete delayed actions introduced by coordinator.onCompleteJoin
+    tryToCompleteDelayedAction()
+  }
+  override def onComplete(): Unit = coordinator.onCompleteJoin(group)
+
+  protected def tryToCompleteDelayedAction(): Unit = 
coordinator.groupManager.replicaManager.tryCompleteDelayedAction()

Review comment:
       This can just be private.

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -110,31 +109,21 @@ abstract class DelayedOperation(override val delayMs: 
Long,
    * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
    * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
    * the operation is actually completed.
+   *

Review comment:
       The above comment is outdated now.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -558,10 +558,27 @@ class ReplicaManager(val config: KafkaConfig,
     localLog(topicPartition).map(_.parentDir)
   }
 
+  // visible for testing
+  val delayedActions = new LinkedBlockingQueue[() => Unit]()
+
+  /**
+   * try to complete delayed action. In order to avoid conflicting locking, 
the actions to complete delayed requests
+   * are kept in a queue. We add the logic to check the ReplicaManager queue 
at the end of KafkaApis.handle(),
+   * at which point, no conflicting locks will be held.
+   */
+  def tryCompleteDelayedAction(): Unit = {
+    val action = delayedActions.poll()
+    if (action != null) action()
+  }
+
   /**
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
    * the callback function will be triggered either when timeout or the 
required acks are satisfied;
    * if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * Noted that all pending delayed check operations in a queue. All callers 
to ReplicaManager.appendRecords() are

Review comment:
       in a queue => are stored in a queue

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -110,31 +109,21 @@ abstract class DelayedOperation(override val delayMs: 
Long,
    * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
    * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
    * the operation is actually completed.
+   *
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() picks up and then execute an action when no lock is 
held.

Review comment:
       KafkaApis.handle() => KafkaApis.handle() and the expiration thread for 
certain delayed operations (e.g. DelayedJoin)

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -558,10 +558,27 @@ class ReplicaManager(val config: KafkaConfig,
     localLog(topicPartition).map(_.parentDir)
   }
 
+  // visible for testing
+  val delayedActions = new LinkedBlockingQueue[() => Unit]()
+
+  /**
+   * try to complete delayed action. In order to avoid conflicting locking, 
the actions to complete delayed requests
+   * are kept in a queue. We add the logic to check the ReplicaManager queue 
at the end of KafkaApis.handle(),

Review comment:
       at the end of KafkaApis.handle() => at the end of KafkaApis.handle() and 
the expiration thread for certain delayed operations (e.g. DelayedJoin)

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -110,31 +109,21 @@ abstract class DelayedOperation(override val delayMs: 
Long,
    * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
    * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
    * the operation is actually completed.
+   *
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() picks up and then execute an action when no lock is 
held.
    */
-  private[server] def maybeTryComplete(): Boolean = {
-    var retry = false
-    var done = false
-    do {
-      if (lock.tryLock()) {
-        try {
-          tryCompletePending.set(false)
-          done = tryComplete()
-        } finally {
-          lock.unlock()
-        }
-        // While we were holding the lock, another thread may have invoked 
`maybeTryComplete` and set
-        // `tryCompletePending`. In this case we should retry.
-        retry = tryCompletePending.get()
-      } else {
-        // Another thread is holding the lock. If `tryCompletePending` is 
already set and this thread failed to
-        // acquire the lock, then the thread that is holding the lock is 
guaranteed to see the flag and retry.
-        // Otherwise, we should set the flag and retry on this thread since 
the thread holding the lock may have
-        // released the lock and returned by the time the flag is set.
-        retry = !tryCompletePending.getAndSet(true)
-      }
-    } while (!isCompleted && retry)
-    done
-  }
+  private[server] def maybeTryComplete(): Boolean = inLock(lock)(tryComplete())

Review comment:
       We probably should rename this to sth like safeTryComplete().




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to