jsancio commented on code in PR #19854:
URL: https://github.com/apache/kafka/pull/19854#discussion_r2121590408


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2145,7 +2186,20 @@ public void 
testObserverSendDiscoveryFetchAfterFetchTimeout(boolean withKip853Rp
         fetchRequest = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest.destination().id());
         
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
+        
assertTrue(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds()));
         context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest.correlationId(),
+            fetchRequest.destination(),
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
Errors.NOT_LEADER_OR_FOLLOWER)
+        );
+
+        context.client.poll();
+        context.pollUntilRequest();
+        fetchRequest = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest.destination().id());
+        
assertFalse(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds()));

Review Comment:
   Let's move this line. This suite should test the protocol and not the 
internal implementation state.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2145,7 +2186,20 @@ public void 
testObserverSendDiscoveryFetchAfterFetchTimeout(boolean withKip853Rp
         fetchRequest = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest.destination().id());
         
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
+        
assertTrue(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds()));

Review Comment:
   Let's not test internal kraft state. This suite should check the protocol.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2033,6 +2033,47 @@ public void 
testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit
         assertTrue(context.client.quorum().isFollower());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int leaderNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .toList();
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withElectedLeader(epoch, leaderNodeId)
+            .withKip853Rpc(withKip853Rpc)
+            .build();
+
+        // after fetch timeout, node will become prospective with leader
+        context.time.sleep(context.fetchTimeoutMs);
+        context.client.poll();
+        assertTrue(context.client.quorum().isProspective());
+        assertEquals(leaderNodeId, 
context.client.quorum().leaderId().getAsInt());

Review Comment:
   You can use `RaftClientTestContext#assertElectedLeader(int, int)`.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2033,6 +2033,47 @@ public void 
testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit
         assertTrue(context.client.quorum().isFollower());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int leaderNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .toList();
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withElectedLeader(epoch, leaderNodeId)
+            .withKip853Rpc(withKip853Rpc)
+            .build();
+
+        // after fetch timeout, node will become prospective with leader
+        context.time.sleep(context.fetchTimeoutMs);
+        context.client.poll();
+        assertTrue(context.client.quorum().isProspective());
+        assertEquals(leaderNodeId, 
context.client.quorum().leaderId().getAsInt());
+        assertEquals(epoch, context.currentEpoch());
+
+        // after election loss node will become follower because it had a last 
known leader
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.client.poll();
+        assertTrue(context.client.quorum().isFollower());
+        assertEquals(leaderNodeId, 
context.client.quorum().leaderId().getAsInt());
+        assertEquals(epoch, context.currentEpoch());
+
+        // node will send fetch request to leader
+        context.pollUntilRequest();

Review Comment:
   Why not `pollUntilRequest` and check that the destination is the leader and 
the epoch is `epoch`?
   
   We should try to avoid testing internal KRaft state in protocol suites like 
`KafkaRaftClientTest`.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2033,6 +2033,47 @@ public void 
testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit
         assertTrue(context.client.quorum().isFollower());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int leaderNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .toList();
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withElectedLeader(epoch, leaderNodeId)
+            .withKip853Rpc(withKip853Rpc)
+            .build();
+
+        // after fetch timeout, node will become prospective with leader
+        context.time.sleep(context.fetchTimeoutMs);
+        context.client.poll();
+        assertTrue(context.client.quorum().isProspective());

Review Comment:
   Same comment here. The test should instead expect VOTE requests with 
pre-vote set to true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to