ahuang98 commented on code in PR #19800: URL: https://github.com/apache/kafka/pull/19800#discussion_r2130040186
########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java: ########## @@ -149,4 +153,327 @@ void testReplicationOfHigherPartitionLeaderEpoch() throws Exception { // Check that only the first batch was appended because the second batch has a greater epoch assertEquals(oldLogEndOffset + numberOfRecords, context.log.endOffset().offset()); } + + @Test + void testHighWatermarkSentInFetchRequest() throws Exception { + int epoch = 2; + int localId = KafkaRaftClientTest.randomReplicaId(); + ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true); + ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, true); + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local.id(), + local.directoryId().get() + ) + .appendToLog(epoch, List.of("a", "b", "c")) + .appendToLog(epoch, List.of("d", "e", "f")) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(local, electedLeader)), KRaftVersion.KRAFT_VERSION_1 + ) + .withElectedLeader(epoch, electedLeader.id()) + .withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL) + .build(); + + var localLogEndOffset = context.log.endOffset().offset(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData( + fetchRequest, + epoch, + localLogEndOffset, + epoch, + OptionalLong.empty() + ); + + // Set the HWM to the LEO + context.deliverResponse( + fetchRequest.correlationId(), + fetchRequest.destination(), + context.fetchResponse( + epoch, + electedLeader.id(), + MemoryRecords.EMPTY, + localLogEndOffset, + Errors.NONE + ) + ); + + context.pollUntilRequest(); + fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData( + fetchRequest, + epoch, + localLogEndOffset, + epoch, + OptionalLong.of(localLogEndOffset) + ); + } + + @Test + void testDefaultHwmDeferred() throws Exception { + var epoch = 2; + var local = KafkaRaftClientTest.replicaKey( + KafkaRaftClientTest.randomReplicaId(), + true + ); + var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true); + var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true); + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local.id(), + local.directoryId().get() + ) + .appendToLog(epoch, List.of("a", "b", "c")) + .appendToLog(epoch, List.of("d", "e", "f")) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1 + ) + .withUnknownLeader(epoch) + .withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL) + .build(); + + context.unattachedToLeader(); + epoch = context.currentEpoch(); + + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); + + var localLogEndOffset = context.log.endOffset().offset(); + var lastFetchedEpoch = context.log.lastFetchedEpoch(); + context.deliverRequest( + context.fetchRequest( + epoch, + remote, + localLogEndOffset, + lastFetchedEpoch, + Integer.MAX_VALUE + ) + ); + + // Check that the fetch response was deferred + for (var i = 0; i < 10; ++i) { Review Comment: does this mean much if mock time isn't progressing? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java: ########## @@ -149,4 +153,327 @@ void testReplicationOfHigherPartitionLeaderEpoch() throws Exception { // Check that only the first batch was appended because the second batch has a greater epoch assertEquals(oldLogEndOffset + numberOfRecords, context.log.endOffset().offset()); } + + @Test + void testHighWatermarkSentInFetchRequest() throws Exception { + int epoch = 2; + int localId = KafkaRaftClientTest.randomReplicaId(); + ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true); + ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, true); + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local.id(), + local.directoryId().get() + ) + .appendToLog(epoch, List.of("a", "b", "c")) + .appendToLog(epoch, List.of("d", "e", "f")) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(local, electedLeader)), KRaftVersion.KRAFT_VERSION_1 + ) + .withElectedLeader(epoch, electedLeader.id()) + .withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL) + .build(); + + var localLogEndOffset = context.log.endOffset().offset(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData( + fetchRequest, + epoch, + localLogEndOffset, + epoch, + OptionalLong.empty() + ); + + // Set the HWM to the LEO + context.deliverResponse( + fetchRequest.correlationId(), + fetchRequest.destination(), + context.fetchResponse( + epoch, + electedLeader.id(), + MemoryRecords.EMPTY, + localLogEndOffset, + Errors.NONE + ) + ); + + context.pollUntilRequest(); + fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData( + fetchRequest, + epoch, + localLogEndOffset, + epoch, + OptionalLong.of(localLogEndOffset) + ); + } + + @Test + void testDefaultHwmDeferred() throws Exception { + var epoch = 2; + var local = KafkaRaftClientTest.replicaKey( + KafkaRaftClientTest.randomReplicaId(), + true + ); + var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true); + var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true); + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local.id(), + local.directoryId().get() + ) + .appendToLog(epoch, List.of("a", "b", "c")) + .appendToLog(epoch, List.of("d", "e", "f")) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1 + ) + .withUnknownLeader(epoch) + .withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL) + .build(); + + context.unattachedToLeader(); + epoch = context.currentEpoch(); + + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); + + var localLogEndOffset = context.log.endOffset().offset(); + var lastFetchedEpoch = context.log.lastFetchedEpoch(); + context.deliverRequest( + context.fetchRequest( + epoch, + remote, + localLogEndOffset, + lastFetchedEpoch, + Integer.MAX_VALUE + ) + ); + + // Check that the fetch response was deferred + for (var i = 0; i < 10; ++i) { + context.client.poll(); + assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH)); + } + } + + @Test + void testUnknownHwmDeferredWhenLeaderUnknowsHwm() throws Exception { Review Comment: instead of "LeaderUnknows" maybe "LeaderDoesNotKnow"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org