jsancio commented on code in PR #19854: URL: https://github.com/apache/kafka/pull/19854#discussion_r2121590408
########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2145,7 +2186,20 @@ public void testObserverSendDiscoveryFetchAfterFetchTimeout(boolean withKip853Rp fetchRequest = context.assertSentFetchRequest(); assertNotEquals(leaderId, fetchRequest.destination().id()); assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id())); + assertTrue(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds())); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + + context.deliverResponse( + fetchRequest.correlationId(), + fetchRequest.destination(), + context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.NOT_LEADER_OR_FOLLOWER) + ); + + context.client.poll(); + context.pollUntilRequest(); + fetchRequest = context.assertSentFetchRequest(); + assertEquals(leaderId, fetchRequest.destination().id()); + assertFalse(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds())); Review Comment: Let's move this line. This suite should test the protocol and not the internal implementation state. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2145,7 +2186,20 @@ public void testObserverSendDiscoveryFetchAfterFetchTimeout(boolean withKip853Rp fetchRequest = context.assertSentFetchRequest(); assertNotEquals(leaderId, fetchRequest.destination().id()); assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id())); + assertTrue(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds())); Review Comment: Let's not test internal kraft state. This suite should check the protocol. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2033,6 +2033,47 @@ public void testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit assertTrue(context.client.quorum().isFollower()); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int otherNodeId = localId + 1; + int leaderNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .toList(); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withElectedLeader(epoch, leaderNodeId) + .withKip853Rpc(withKip853Rpc) + .build(); + + // after fetch timeout, node will become prospective with leader + context.time.sleep(context.fetchTimeoutMs); + context.client.poll(); + assertTrue(context.client.quorum().isProspective()); + assertEquals(leaderNodeId, context.client.quorum().leaderId().getAsInt()); Review Comment: You can use `RaftClientTestContext#assertElectedLeader(int, int)`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2033,6 +2033,47 @@ public void testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit assertTrue(context.client.quorum().isFollower()); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int otherNodeId = localId + 1; + int leaderNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .toList(); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withElectedLeader(epoch, leaderNodeId) + .withKip853Rpc(withKip853Rpc) + .build(); + + // after fetch timeout, node will become prospective with leader + context.time.sleep(context.fetchTimeoutMs); + context.client.poll(); + assertTrue(context.client.quorum().isProspective()); + assertEquals(leaderNodeId, context.client.quorum().leaderId().getAsInt()); + assertEquals(epoch, context.currentEpoch()); + + // after election loss node will become follower because it had a last known leader + context.time.sleep(context.electionTimeoutMs() * 2L); + context.client.poll(); + assertTrue(context.client.quorum().isFollower()); + assertEquals(leaderNodeId, context.client.quorum().leaderId().getAsInt()); + assertEquals(epoch, context.currentEpoch()); + + // node will send fetch request to leader + context.pollUntilRequest(); Review Comment: Why not `pollUntilRequest` and check that the destination is the leader and the epoch is `epoch`? We should try to avoid testing internal KRaft state in protocol suites like `KafkaRaftClientTest`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2033,6 +2033,47 @@ public void testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit assertTrue(context.client.quorum().isFollower()); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int otherNodeId = localId + 1; + int leaderNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .toList(); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withElectedLeader(epoch, leaderNodeId) + .withKip853Rpc(withKip853Rpc) + .build(); + + // after fetch timeout, node will become prospective with leader + context.time.sleep(context.fetchTimeoutMs); + context.client.poll(); + assertTrue(context.client.quorum().isProspective()); Review Comment: Same comment here. The test should instead expect VOTE requests with pre-vote set to true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org