ahuang98 commented on code in PR #19800:
URL: https://github.com/apache/kafka/pull/19800#discussion_r2130040186


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -149,4 +153,327 @@ void testReplicationOfHigherPartitionLeaderEpoch() throws 
Exception {
         // Check that only the first batch was appended because the second 
batch has a greater epoch
         assertEquals(oldLogEndOffset + numberOfRecords, 
context.log.endOffset().offset());
     }
+
+    @Test
+    void testHighWatermarkSentInFetchRequest() throws Exception {
+        int epoch = 2;
+        int localId = KafkaRaftClientTest.randomReplicaId();
+        ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+        ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, 
true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, electedLeader)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, electedLeader.id())
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            localLogEndOffset,
+            epoch,
+            OptionalLong.empty()
+        );
+
+        // Set the HWM to the LEO
+        context.deliverResponse(
+            fetchRequest.correlationId(),
+            fetchRequest.destination(),
+            context.fetchResponse(
+                epoch,
+                electedLeader.id(),
+                MemoryRecords.EMPTY,
+                localLogEndOffset,
+                Errors.NONE
+            )
+        );
+
+        context.pollUntilRequest();
+        fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            localLogEndOffset,
+            epoch,
+            OptionalLong.of(localLogEndOffset)
+        );
+    }
+
+    @Test
+    void testDefaultHwmDeferred() throws Exception {
+        var epoch = 2;
+        var local = KafkaRaftClientTest.replicaKey(
+            KafkaRaftClientTest.randomReplicaId(),
+            true
+        );
+        var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+        var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, voter)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+        var lastFetchedEpoch = context.log.lastFetchedEpoch();
+        context.deliverRequest(
+            context.fetchRequest(
+                epoch,
+                remote,
+                localLogEndOffset,
+                lastFetchedEpoch,
+                Integer.MAX_VALUE
+            )
+        );
+
+        // Check that the fetch response was deferred
+        for (var i = 0; i < 10; ++i) {

Review Comment:
   does this mean much if mock time isn't progressing?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -149,4 +153,327 @@ void testReplicationOfHigherPartitionLeaderEpoch() throws 
Exception {
         // Check that only the first batch was appended because the second 
batch has a greater epoch
         assertEquals(oldLogEndOffset + numberOfRecords, 
context.log.endOffset().offset());
     }
+
+    @Test
+    void testHighWatermarkSentInFetchRequest() throws Exception {
+        int epoch = 2;
+        int localId = KafkaRaftClientTest.randomReplicaId();
+        ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+        ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, 
true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, electedLeader)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, electedLeader.id())
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            localLogEndOffset,
+            epoch,
+            OptionalLong.empty()
+        );
+
+        // Set the HWM to the LEO
+        context.deliverResponse(
+            fetchRequest.correlationId(),
+            fetchRequest.destination(),
+            context.fetchResponse(
+                epoch,
+                electedLeader.id(),
+                MemoryRecords.EMPTY,
+                localLogEndOffset,
+                Errors.NONE
+            )
+        );
+
+        context.pollUntilRequest();
+        fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            localLogEndOffset,
+            epoch,
+            OptionalLong.of(localLogEndOffset)
+        );
+    }
+
+    @Test
+    void testDefaultHwmDeferred() throws Exception {
+        var epoch = 2;
+        var local = KafkaRaftClientTest.replicaKey(
+            KafkaRaftClientTest.randomReplicaId(),
+            true
+        );
+        var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+        var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, voter)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+        var lastFetchedEpoch = context.log.lastFetchedEpoch();
+        context.deliverRequest(
+            context.fetchRequest(
+                epoch,
+                remote,
+                localLogEndOffset,
+                lastFetchedEpoch,
+                Integer.MAX_VALUE
+            )
+        );
+
+        // Check that the fetch response was deferred
+        for (var i = 0; i < 10; ++i) {
+            context.client.poll();
+            assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+        }
+    }
+
+    @Test
+    void testUnknownHwmDeferredWhenLeaderUnknowsHwm() throws Exception {

Review Comment:
   instead of "LeaderUnknows" maybe "LeaderDoesNotKnow"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to