dajac commented on code in PR #18647: URL: https://github.com/apache/kafka/pull/18647#discussion_r1922706349
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -2146,101 +2144,6 @@ class ReplicaManagerTest { } } - @Test - def testClearFetchPurgatoryOnStopReplica(): Unit = { - // As part of a reassignment, we may send StopReplica to the old leader. - // In this case, we should ensure that pending purgatory operations are cancelled - // immediately rather than sitting around to timeout. - - val mockTimer = new MockTimer(time) - val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) - - try { - val tp0 = new TopicPartition(topic, 0) - val tidp0 = new TopicIdPartition(topicId, tp0) - val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava - - val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(1) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) - - val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, - Optional.of(1)) - val fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, maxWaitMs = 10) - assertFalse(fetchResult.hasFired) - when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) - - // We have a fetch in purgatory, now receive a stop replica request and - // assert that the fetch returns with a NOT_LEADER error - replicaManager.stopReplicas(2, 0, 0, - mutable.Map(tp0 -> new StopReplicaPartitionState() - .setPartitionIndex(tp0.partition) - .setDeletePartition(true) - .setLeaderEpoch(LeaderAndIsr.EPOCH_DURING_DELETE))) - - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error) - } finally { - replicaManager.shutdown(checkpointHW = false) - } - } - - @Test - def testClearProducePurgatoryOnStopReplica(): Unit = { Review Comment: This one is covered by testDeltaToFollowerCompletesProduce. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org