junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r442330606
########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ########## @@ -106,12 +107,29 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest } } - def createGroupMembers(groupPrefix: String): Set[GroupMember] = { - (0 until nGroups).flatMap { i => - new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, replicaManager).members - }.toSet + /** + * make CompleteTxnOperation and CommitTxnOffsetsOperation complete atomically since they don't typically overlap. + * Otherwise CompleteTxnOperation may see a pending offsetAndMetadata without an appendedBatchOffset. + */ + private val txnLock = new ReentrantLock + private val allGroupMembers = mutable.ArrayBuffer[GroupMember]() + + def groupMembers(groupId: String, nMembers: Int, groupCoordinator: GroupCoordinator): Seq[GroupMember] = { + val groupPartitionId = groupCoordinator.partitionFor(groupId) + groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId) + val members = (0 until nMembers).map(i => new GroupMember(groupId = groupId, + groupPartitionId = groupPartitionId, + leader = i == 0, + // same producerId to tests more on transactional conflicts. + producerId = 1000, + txnLock = txnLock)) + allGroupMembers ++= members Review comment: Since createGroupMembers() is called in multiple tests, it seems we will be accumulating allGroupMembers across tests. That seems unexpected? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org