dajac commented on a change in pull request #11942: URL: https://github.com/apache/kafka/pull/11942#discussion_r835364568
########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1300,67 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testHasPreferredReplica(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId), extraProps = props) Review comment: Using this method create a lot of unnecessary noise in my opinion. Nowadays, we tend to use `setupReplicaManagerWithMockedPurgatories` which is simpler. `setupReplicaManagerWithMockedPurgatories` does not support racks though. I suppose that we could it accept an optional Map from broker id to rack. Could we try using this one instead? There are many example in this file. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1300,67 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testHasPreferredReplica(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId), extraProps = props) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + initializeLogAndTopicId(replicaManager, tp0, topicId) + + // Make this replica the follower Review comment: I suppose the replica is a leader, no? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1300,67 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testHasPreferredReplica(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId), extraProps = props) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + initializeLogAndTopicId(replicaManager, tp0, topicId) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, new LogOffsetMetadata(0), 0, 0, 0) + + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchAsConsumer(replicaManager, tidp0, + new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), minBytes = 1, + clientMetadata = Some(metadata)) Review comment: Could we also explicitly set the timeout? I would also add a small comment explaining why we set min bytes and max wait time. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1300,67 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testHasPreferredReplica(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId), extraProps = props) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + initializeLogAndTopicId(replicaManager, tp0, topicId) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, new LogOffsetMetadata(0), 0, 0, 0) + + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchAsConsumer(replicaManager, tidp0, + new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), minBytes = 1, + clientMetadata = Some(metadata)) + + // Fetch from follower succeeds + assertTrue(consumerResult.isFired) + + // Returns a preferred replica (should just be the leader, which is None) Review comment: I suppose that the comment is not correct, isn't it? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1300,67 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testHasPreferredReplica(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId), extraProps = props) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + initializeLogAndTopicId(replicaManager, tp0, topicId) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, new LogOffsetMetadata(0), 0, 0, 0) + + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchAsConsumer(replicaManager, tidp0, + new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), minBytes = 1, + clientMetadata = Some(metadata)) + + // Fetch from follower succeeds Review comment: I suppose that the comment is also not correct here, isn't it? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1300,67 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testHasPreferredReplica(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId), extraProps = props) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + initializeLogAndTopicId(replicaManager, tp0, topicId) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, new LogOffsetMetadata(0), 0, 0, 0) + + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", Review comment: nit: Is specifying `ClientMetadata` required? We usually omit the types in Scala unless required. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1300,67 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testHasPreferredReplica(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId), extraProps = props) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + initializeLogAndTopicId(replicaManager, tp0, topicId) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, new LogOffsetMetadata(0), 0, 0, 0) Review comment: Why do we need this? Does the replica selector ignore the replica if it is not caught-up? It might be worth adding a small comment. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1300,67 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testHasPreferredReplica(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId), extraProps = props) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + initializeLogAndTopicId(replicaManager, tp0, topicId) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, new LogOffsetMetadata(0), 0, 0, 0) + + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchAsConsumer(replicaManager, tidp0, + new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), minBytes = 1, + clientMetadata = Some(metadata)) + + // Fetch from follower succeeds + assertTrue(consumerResult.isFired) + + // Returns a preferred replica (should just be the leader, which is None) + assertTrue(consumerResult.assertFired.preferredReadReplica.isDefined) Review comment: Could we also verify that the fetch purgatory was not touched? I want to verify that no delayed fetch was inserted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org