junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r439543737



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, 
requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks 
when completing delayed requests so it may

Review comment:
       It's cleaner to not pass in completeDelayedRequests here and let the 
caller (`ReplicaManager.appendRecords()`) check and complete purgatory instead.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1118,33 +1170,38 @@ class GroupCoordinator(val brokerId: Int,
     group.removeStaticMember(member.groupInstanceId)
 
     group.currentState match {
-      case Dead | Empty =>
-      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
-      case PreparingRebalance => 
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      case Dead | Empty => None
+      case Stable | CompletingRebalance =>
+        maybePrepareRebalance(group, reason)
+        None
+      case PreparingRebalance => Some(GroupKey(group.groupId))
     }
   }
 
-  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, 
memberId: String): Unit = {
+  /**
+   * remove the pending member and then return the group key whihc is in 
PreparingRebalance,
+   * @param group group
+   * @param memberId member id
+   * @return group key if it is in PreparingRebalance. Otherwise, None
+   */
+  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, 
memberId: String): Option[GroupKey] = {
     group.removePendingMember(memberId)
 
-    if (group.is(PreparingRebalance)) {
-      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
-    }
-  }
-
-  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
-    group.inLock {
-      if (group.hasAllMembersJoined)
-        forceComplete()
-      else false
-    }
+    if (group.is(PreparingRebalance)) Some(GroupKey(group.groupId))
+    else None
   }
 
   def onExpireJoin(): Unit = {
     // TODO: add metrics for restabilize timeouts
   }
 
-  def onCompleteJoin(group: GroupMetadata): Unit = {
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   *         incremented. If the caller passes in completeDelayedRequests as 
false, the caller is expected to complete

Review comment:
       The caller no longer passed in completeDelayedRequests.

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,40 +99,24 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
+   * Thread-safe variant of tryComplete() that attempts completion after it 
succeed to hold the lock.
    *
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group

Review comment:
       There was => There were

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -3921,22 +3934,26 @@ class GroupCoordinatorTest {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
     val capturedArgument: Capture[scala.collection.Map[TopicPartition, 
PartitionResponse] => Unit] = EasyMock.newCapture()
-
-    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
-      EasyMock.anyShort(),
+    
EasyMock.expect(replicaManager.completeDelayedRequests(EasyMock.anyObject()))

Review comment:
       Hmm, this should only be called with LeaderHWChange.LeaderHWIncremented, 
but the mock later returns LeaderHWChange.None? Ditto below.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we 
keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: 
scala.collection.Map[TopicPartition, LeaderHWChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete 
delayed requests.
+   *
+   */
+  override def tryComplete(): Boolean = try group.inLock {
+    /**
+     * holds the group lock for both the "group.hasAllMembersJoined" check and 
the call to forceComplete()
+     */
+    if (group.hasAllMembersJoined) forceComplete()
+    else false
+  } finally completeDelayedRequests()
+  override def onExpiration(): Unit = coordinator.onExpireJoin()
+  override def onComplete(): Unit = try partitionsToComplete = 
coordinator.onCompleteJoin(group)
+  finally completeDelayedRequests()
+
+  /**
+   * try to complete delayed requests only if the caller does not hold the 
group lock.
+   * This method is called by following cases:
+   * 1) tryComplete -> hold lock -> onComplete -> release lock -> 
completeDelayedRequests
+   * 2) onComplete -> completeDelayedRequests

Review comment:
       expire ->  onComplete -> completeDelayedRequests

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {

Review comment:
       DelyaedOperation.lockOpt defaults to None. So, we don't have to specify 
it explicitly.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we 
keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: 
scala.collection.Map[TopicPartition, LeaderHWChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete 
delayed requests.

Review comment:
       "GroupCoordinator#onCompleteJoin() tries to complete delayed requests" 
=> since the completion of the delayed request for partitions returned from 
GroupCoordinator#onCompleteJoin() need to be done outside of the group lock.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -603,6 +650,9 @@ class ReplicaManager(val config: KafkaConfig,
         val produceResponseStatus = produceStatus.map { case (k, status) => k 
-> status.responseStatus }
         responseCallback(produceResponseStatus)
       }
+      localProduceResults
+        .filter { case (_, logAppendResult) => 
logAppendResult.exception.isEmpty}
+        .map(e => e._1 -> e._2.leaderHWChange)

Review comment:
       Could we use `map {case (tp, appendResult) => ...}` here to avoid using 
unamed references?

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we 
keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: 
scala.collection.Map[TopicPartition, LeaderHWChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete 
delayed requests.
+   *
+   */
+  override def tryComplete(): Boolean = try group.inLock {
+    /**
+     * holds the group lock for both the "group.hasAllMembersJoined" check and 
the call to forceComplete()
+     */
+    if (group.hasAllMembersJoined) forceComplete()
+    else false
+  } finally completeDelayedRequests()
+  override def onExpiration(): Unit = coordinator.onExpireJoin()
+  override def onComplete(): Unit = try partitionsToComplete = 
coordinator.onCompleteJoin(group)
+  finally completeDelayedRequests()
+
+  /**
+   * try to complete delayed requests only if the caller does not hold the 
group lock.
+   * This method is called by following cases:
+   * 1) tryComplete -> hold lock -> onComplete -> release lock -> 
completeDelayedRequests
+   * 2) onComplete -> completeDelayedRequests
+   */
+  private[group] def completeDelayedRequests(): Unit = if 
(!group.lock.isHeldByCurrentThread) {

Review comment:
       Another way that doesn't require checking lock.isHeldByCurrentThread is 
the following. But your approach seems simpler.
   
   Override forceComplete() to
   ```
   override def forceComplete() {
       if (completed.compareAndSet(false, true)) {
         // cancel the timeout timer
         cancel()
         partitionsToComplete  = coordinator.onCompleteJoin(group)
         onComplete()
         true
       } else {
         false
       }
   }
   ```
   In onComplete(), do nothing.
   
   In tryComplete(), do
   ```
   override def tryComplete() {
     group.inLock {
       if (group.hasAllMembersJoined) 
         isForceComplete = forceComplete()
     }
     completeDelayedRequests(partitionsToComplete)
     isForceComplete
   }
   ```
   
   In onExpiration(),
   ```
   override def onExpiration() {
     completeDelayedRequests(partitionsToComplete)
   }
   ```
   

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     override def runWithCallback(member: GroupMember, responseCallback: 
CompleteTxnCallback): Unit = {
       val producerId = 1000L
       val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-      groupCoordinator.groupManager.handleTxnCompletion(producerId,
-        offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
+      val isCommit = random.nextBoolean
+      try groupCoordinator.groupManager.handleTxnCompletion(producerId,
+        offsetsPartitions.map(_.partition).toSet, isCommit = isCommit)
+      catch {
+        case e: IllegalStateException if isCommit
+          && e.getMessage.contains("though the offset commit record itself 
hasn't been appended to the log")=>

Review comment:
       Hmm, why do we need this logic now?

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -311,37 +317,47 @@ class GroupMetadataManager(brokerId: Int,
 
           responseCallback(responseError)
         }
-        appendForGroup(group, groupMetadataRecords, putCacheCallback)
-
+        appendForGroup(group, groupMetadataRecords, putCacheCallback, 
completeDelayedRequests)
       case None =>
         responseCallback(Errors.NOT_COORDINATOR)
-        None
+        Map.empty
     }
   }
 
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   *         incremented. If the caller passes in completeDelayedRequests as 
false, the caller is expected to complete
+   *         delayed requests for those returned partitions.
+   */
   private def appendForGroup(group: GroupMetadata,
                              records: Map[TopicPartition, MemoryRecords],
-                             callback: Map[TopicPartition, PartitionResponse] 
=> Unit): Unit = {
+                             callback: Map[TopicPartition, PartitionResponse] 
=> Unit,
+                             completeDelayedRequests: Boolean): 
Map[TopicPartition, LeaderHWChange] = {
     // call replica manager to append the group message
     replicaManager.appendRecords(
       timeout = config.offsetCommitTimeoutMs.toLong,
       requiredAcks = config.offsetCommitRequiredAcks,
       internalTopicsAllowed = true,
       origin = AppendOrigin.Coordinator,
+      completeDelayedRequests = completeDelayedRequests,
       entriesPerPartition = records,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback)
   }
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to 
cache
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   *         incremented. If the caller passes in completeDelayedRequests as 
false, the caller is expected to complete
+   *         delayed requests for those returned partitions.
    */
   def storeOffsets(group: GroupMetadata,
                    consumerId: String,
                    offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicPartition, Errors] => 
Unit,
+                   completeDelayedRequests: Boolean,

Review comment:
       All callers pass in completeDelayedRequests as false. Could we remove 
this param?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1118,33 +1170,38 @@ class GroupCoordinator(val brokerId: Int,
     group.removeStaticMember(member.groupInstanceId)
 
     group.currentState match {
-      case Dead | Empty =>
-      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
-      case PreparingRebalance => 
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      case Dead | Empty => None
+      case Stable | CompletingRebalance =>
+        maybePrepareRebalance(group, reason)
+        None
+      case PreparingRebalance => Some(GroupKey(group.groupId))
     }
   }
 
-  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, 
memberId: String): Unit = {
+  /**
+   * remove the pending member and then return the group key whihc is in 
PreparingRebalance,

Review comment:
       typo whihc

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, 
requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks 
when completing delayed requests so it may

Review comment:
       may requires => may require

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,40 +99,24 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
+   * Thread-safe variant of tryComplete() that attempts completion after it 
succeed to hold the lock.
    *
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * We introduces a flag in ReplicaManager.appendRecords()., called 
completeDelayedRequests, to prevent the method

Review comment:
        ReplicaManager.appendRecords()., => ReplicaManager.appendRecords(),

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##########
@@ -536,6 +537,11 @@ class GroupCoordinatorTest {
     // Make sure the NewMemberTimeout is not still in effect, and the member 
is not kicked
     assertEquals(1, group.size)
 
+    // prepare the mock replica manager again since the delayed join is going 
to complete
+    EasyMock.reset(replicaManager)
+    
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()

Review comment:
       Hmm, why do we need to mock this since replicaManager.getMagic() is only 
called through replicaManager.handleWriteTxnMarkersRequest()?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to