junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r441846005
########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition, } } - def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { + /** + * @param completeDelayedRequests true: the delayed requests may be completed inside the call with the expectation + * that no conflicting locks are held by the caller. Otherwise, the caller is expected + * to complete delayed requests for those returned partitions if there is no + * completeDelayedRequests passed in. Review comment: if there is no completeDelayedRequests passed in => if completeDelayedRequests is false ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -560,19 +603,27 @@ class ReplicaManager(val config: KafkaConfig, * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. + * + * @param completeDelayedRequests true: the delayed requests may be completed inside the call with the expectation + * that no conflicting locks are held by the caller. Otherwise, the caller is expected + * to complete delayed requests for those returned partitions if there is no + * completeDelayedRequests passed in. Review comment: if there is no completeDelayedRequests passed in => if completeDelayedRequests is false ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ########## @@ -106,25 +97,56 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest } } - def createGroupMembers(groupPrefix: String): Set[GroupMember] = { - (0 until nGroups).flatMap { i => - new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, replicaManager).members - }.toSet + private var producerIdCount = 0L + private val allGroupMembers = mutable.ArrayBuffer[GroupMember]() + + def groupMembers(groupId: String, nMembers: Int, groupCoordinator: GroupCoordinator): Seq[GroupMember] = { + val groupPartitionId = groupCoordinator.partitionFor(groupId) + groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId) + val lock = new ReentrantLock() + val producerId = producerIdCount + producerIdCount += 1 Review comment: I think the intention for the test is probably to use the same producerId since it tests more on transactional conflicts. ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ########## @@ -278,37 +302,48 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest } } - class CommitTxnOffsetsOperation extends CommitOffsetsOperation { + /** + * @param needLock true to make CompleteTxnOperation happen after CommitTxnOffsetsOperation. It presents error when + * both CommitTxnOffsetsOperation and CompleteTxnOperation are executed on parallel. Review comment: Perhaps change the comment to sth like the following? "Setting to true to make CompleteTxnOperation and CommitTxnOffsetsOperation complete atomically since they don't typically overlap. Otherwise CompleteTxnOperation may see a pending offsetAndMetadata without an appendedBatchOffset." ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ########## @@ -106,25 +97,56 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest } } - def createGroupMembers(groupPrefix: String): Set[GroupMember] = { - (0 until nGroups).flatMap { i => - new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, replicaManager).members - }.toSet + private var producerIdCount = 0L + private val allGroupMembers = mutable.ArrayBuffer[GroupMember]() + + def groupMembers(groupId: String, nMembers: Int, groupCoordinator: GroupCoordinator): Seq[GroupMember] = { + val groupPartitionId = groupCoordinator.partitionFor(groupId) + groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId) + val lock = new ReentrantLock() + val producerId = producerIdCount + producerIdCount += 1 + val members = (0 until nMembers).map(i => new GroupMember(groupId = groupId, + groupPartitionId = groupPartitionId, + leader = i == 0, + producerId = producerId, + txnLock = lock)) + allGroupMembers ++= members + members } + def createGroupMembers(groupPrefix: String): Set[GroupMember] = + (0 until nGroups).flatMap(i => groupMembers(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator)).toSet + @Test def testConcurrentGoodPathSequence(): Unit = { verifyConcurrentOperations(createGroupMembers, allOperations) } @Test def testConcurrentTxnGoodPathSequence(): Unit = { - verifyConcurrentOperations(createGroupMembers, allOperationsWithTxn) + verifyConcurrentOperations(createGroupMembers, Seq( + new JoinGroupOperation, + new SyncGroupOperation, + new OffsetFetchOperation, + new CommitTxnOffsetsOperation(needLock = false), + new CompleteTxnOperation(needLock = false), Review comment: Hmm, why don't we need the lock here since CommitTxnOffsetsOperation and CompleteTxnOperation could still run in parallel? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org