junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r442330606



##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -106,12 +107,29 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     }
   }
 
-  def createGroupMembers(groupPrefix: String): Set[GroupMember] = {
-    (0 until nGroups).flatMap { i =>
-      new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, 
replicaManager).members
-    }.toSet
+  /**
+   * make CompleteTxnOperation and CommitTxnOffsetsOperation complete 
atomically since they don't typically overlap.
+   * Otherwise CompleteTxnOperation may see a pending offsetAndMetadata 
without an appendedBatchOffset.
+   */
+  private val txnLock = new ReentrantLock
+  private val allGroupMembers = mutable.ArrayBuffer[GroupMember]()
+
+  def groupMembers(groupId: String, nMembers: Int, groupCoordinator: 
GroupCoordinator): Seq[GroupMember] = {
+    val groupPartitionId = groupCoordinator.partitionFor(groupId)
+    groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+    val members = (0 until nMembers).map(i => new GroupMember(groupId = 
groupId,
+      groupPartitionId = groupPartitionId,
+      leader = i == 0,
+      // same producerId to tests more on transactional conflicts.
+      producerId = 1000,
+      txnLock = txnLock))
+    allGroupMembers ++= members

Review comment:
       Since createGroupMembers() is called in multiple tests, it seems we will 
be accumulating allGroupMembers across tests. That seems unexpected?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to