junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r441846005



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, 
requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests true: the delayed requests may be 
completed inside the call with the expectation
+   *                                that no conflicting locks are held by the 
caller. Otherwise, the caller is expected
+   *                                to complete delayed requests for those 
returned partitions if there is no
+   *                                completeDelayedRequests passed in.

Review comment:
       if there is no completeDelayedRequests passed in => if 
completeDelayedRequests is false

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -560,19 +603,27 @@ class ReplicaManager(val config: KafkaConfig,
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
    * the callback function will be triggered either when timeout or the 
required acks are satisfied;
    * if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * @param completeDelayedRequests true: the delayed requests may be 
completed inside the call with the expectation
+   *                                that no conflicting locks are held by the 
caller. Otherwise, the caller is expected
+   *                                to complete delayed requests for those 
returned partitions if there is no
+   *                                completeDelayedRequests passed in.

Review comment:
       if there is no completeDelayedRequests passed in => if 
completeDelayedRequests is false

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -106,25 +97,56 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     }
   }
 
-  def createGroupMembers(groupPrefix: String): Set[GroupMember] = {
-    (0 until nGroups).flatMap { i =>
-      new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, 
replicaManager).members
-    }.toSet
+  private var producerIdCount = 0L
+  private val allGroupMembers = mutable.ArrayBuffer[GroupMember]()
+
+  def groupMembers(groupId: String, nMembers: Int, groupCoordinator: 
GroupCoordinator): Seq[GroupMember] = {
+    val groupPartitionId = groupCoordinator.partitionFor(groupId)
+    groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+    val lock = new ReentrantLock()
+    val producerId = producerIdCount
+    producerIdCount += 1

Review comment:
       I think the intention for the test is probably to use the same 
producerId since it tests more on transactional conflicts.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -278,37 +302,48 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     }
   }
 
-  class CommitTxnOffsetsOperation extends CommitOffsetsOperation {
+  /**
+   * @param needLock true to make CompleteTxnOperation happen after 
CommitTxnOffsetsOperation. It presents error when
+   *                 both CommitTxnOffsetsOperation and CompleteTxnOperation 
are executed on parallel.

Review comment:
       Perhaps change the comment to sth like the following?
   
   "Setting to true to make CompleteTxnOperation and CommitTxnOffsetsOperation 
complete atomically since they don't typically overlap. Otherwise 
CompleteTxnOperation may see a pending offsetAndMetadata without an 
appendedBatchOffset."

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -106,25 +97,56 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     }
   }
 
-  def createGroupMembers(groupPrefix: String): Set[GroupMember] = {
-    (0 until nGroups).flatMap { i =>
-      new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, 
replicaManager).members
-    }.toSet
+  private var producerIdCount = 0L
+  private val allGroupMembers = mutable.ArrayBuffer[GroupMember]()
+
+  def groupMembers(groupId: String, nMembers: Int, groupCoordinator: 
GroupCoordinator): Seq[GroupMember] = {
+    val groupPartitionId = groupCoordinator.partitionFor(groupId)
+    groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+    val lock = new ReentrantLock()
+    val producerId = producerIdCount
+    producerIdCount += 1
+    val members = (0 until nMembers).map(i => new GroupMember(groupId = 
groupId,
+      groupPartitionId = groupPartitionId,
+      leader = i == 0,
+      producerId = producerId,
+      txnLock = lock))
+    allGroupMembers ++= members
+    members
   }
 
+  def createGroupMembers(groupPrefix: String): Set[GroupMember] =
+    (0 until nGroups).flatMap(i => groupMembers(s"$groupPrefix$i", 
nMembersPerGroup, groupCoordinator)).toSet
+
   @Test
   def testConcurrentGoodPathSequence(): Unit = {
     verifyConcurrentOperations(createGroupMembers, allOperations)
   }
 
   @Test
   def testConcurrentTxnGoodPathSequence(): Unit = {
-    verifyConcurrentOperations(createGroupMembers, allOperationsWithTxn)
+    verifyConcurrentOperations(createGroupMembers, Seq(
+      new JoinGroupOperation,
+      new SyncGroupOperation,
+      new OffsetFetchOperation,
+      new CommitTxnOffsetsOperation(needLock = false),
+      new CompleteTxnOperation(needLock = false),

Review comment:
       Hmm, why don't we need the lock here since CommitTxnOffsetsOperation and 
CompleteTxnOperation could still run in parallel?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to