jsancio commented on code in PR #17352:
URL: https://github.com/apache/kafka/pull/17352#discussion_r1876361982


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3137,12 +3139,12 @@ private long pollUnattachedAsVoter(UnattachedState 
state, long currentTimeMs) {
             transitionToCandidate(currentTimeMs);
             return 0L;
         } else {
-            return state.remainingElectionTimeMs(currentTimeMs);
+            return pollUnattachedAsObserver(state, currentTimeMs);

Review Comment:
   This is correct but it is odd naming that `pollUnattachedAsVoter` calls 
`pollUnattachedAsObservers`. Maybe this name makes more sense 
`pollUnattachedCommon`.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2628,6 +2697,100 @@ public void 
testVoteResponseIgnoredAfterBecomingFollower(boolean withKip853Rpc)
         context.assertElectedLeader(epoch, voter3);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .collect(Collectors.toList());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withKip853Rpc(withKip853Rpc)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest1.destination().id());
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest1.correlationId(),
+            fetchRequest1.destination(),
+            context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, 
Errors.BROKER_NOT_AVAILABLE)
+        );
+        context.pollUntilRequest();
+
+        // We should retry the Fetch against the other voter since the original
+        // voter connection will be backing off.
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
+        assertNotEquals(leaderId, fetchRequest2.destination().id());
+        
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+
+        Errors error = fetchRequest2.destination().id() == leaderId ?
+            Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER;
+        context.deliverResponse(
+            fetchRequest2.correlationId(),
+            fetchRequest2.destination(),
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
error)
+        );
+        context.client.poll();
+
+        context.assertElectedLeader(epoch, leaderId);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testFollowerLeaderRediscoveryAfterRequestTimeout(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .collect(Collectors.toList());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withKip853Rpc(withKip853Rpc)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest1.destination().id());
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+
+        context.time.sleep(context.requestTimeoutMs());
+        context.pollUntilRequest();
+
+        // We should retry the Fetch against the other voter since the original
+        // voter connection will be backing off.
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
+        assertNotEquals(leaderId, fetchRequest2.destination().id());
+        
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest2.correlationId(),
+            fetchRequest2.destination(),
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
Errors.FENCED_LEADER_EPOCH)
+        );
+        context.client.poll();
+
+        context.assertElectedLeader(epoch, leaderId);

Review Comment:
   Similar comment here. The leader didn't change so you don't know if this is 
from the FETCH response for `fetchRequest2` or the existing state of the 
replica.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2628,6 +2697,100 @@ public void 
testVoteResponseIgnoredAfterBecomingFollower(boolean withKip853Rpc)
         context.assertElectedLeader(epoch, voter3);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .collect(Collectors.toList());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withKip853Rpc(withKip853Rpc)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest1.destination().id());
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest1.correlationId(),
+            fetchRequest1.destination(),
+            context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, 
Errors.BROKER_NOT_AVAILABLE)
+        );
+        context.pollUntilRequest();
+
+        // We should retry the Fetch against the other voter since the original
+        // voter connection will be backing off.
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
+        assertNotEquals(leaderId, fetchRequest2.destination().id());
+        
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+
+        Errors error = fetchRequest2.destination().id() == leaderId ?
+            Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER;

Review Comment:
   Would this always be `NOT_LEADER_OR_FOLLOWER` since 
`assertNotEquals(leaderId, ...)` passed?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2628,6 +2697,100 @@ public void 
testVoteResponseIgnoredAfterBecomingFollower(boolean withKip853Rpc)
         context.assertElectedLeader(epoch, voter3);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .collect(Collectors.toList());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withKip853Rpc(withKip853Rpc)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest1.destination().id());
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest1.correlationId(),
+            fetchRequest1.destination(),
+            context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, 
Errors.BROKER_NOT_AVAILABLE)
+        );
+        context.pollUntilRequest();
+
+        // We should retry the Fetch against the other voter since the original
+        // voter connection will be backing off.
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
+        assertNotEquals(leaderId, fetchRequest2.destination().id());
+        
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+
+        Errors error = fetchRequest2.destination().id() == leaderId ?
+            Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER;
+        context.deliverResponse(
+            fetchRequest2.correlationId(),
+            fetchRequest2.destination(),
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
error)
+        );
+        context.client.poll();
+
+        context.assertElectedLeader(epoch, leaderId);

Review Comment:
   Okay but technically, the leader didn't change so you don't know if this is 
from the FETCH response for `fetchRequest2` or the existing state of the 
replica.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3057,28 +3057,30 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
 
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
         if (state.hasFetchTimeoutExpired(currentTimeMs)) {
-            return maybeSendAnyVoterFetch(currentTimeMs);
+            return maybeSendAnyBootstrapFetch(currentTimeMs);
         } else {
-            final long backoffMs;
-
-            // If the current leader is backing off due to some failure or if 
the
-            // request has timed out, then we attempt to send the Fetch to 
another
-            // voter in order to discover if there has been a leader change.
-            Node leaderNode = state.leaderNode(channel.listenerName());
-            if (requestManager.hasRequestTimedOut(leaderNode, currentTimeMs)) {
-                // Once the request has timed out backoff the connection
-                requestManager.reset(leaderNode);
-                backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
-            } else if (requestManager.isBackingOff(leaderNode, currentTimeMs)) 
{
-                backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
-            } else if (!requestManager.hasAnyInflightRequest(currentTimeMs)) {
-                backoffMs = maybeSendFetchOrFetchSnapshot(state, 
currentTimeMs);
-            } else {
-                backoffMs = 
requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs);
-            }
+            return maybeSendFetchToQuorum(state, currentTimeMs);
+        }
+    }
 
-            return Math.min(backoffMs, 
state.remainingFetchTimeMs(currentTimeMs));
+    private long maybeSendFetchToQuorum(FollowerState state, long 
currentTimeMs) {

Review Comment:
   How about `maybeSendFetchToBestNode`? The best node is the leader but if the 
leader is unavailable then send the request to the any of the bootstrap servers.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2755,7 +2755,7 @@ private FetchRequestData buildFetchRequest() {
             .setReplicaState(new 
FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel()));
     }
 
-    private long maybeSendAnyVoterFetch(long currentTimeMs) {
+    private long maybeSendAnyBootstrapFetch(long currentTimeMs) {

Review Comment:
   How about `maybeSendToAnyBootstrap`?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2628,6 +2697,100 @@ public void 
testVoteResponseIgnoredAfterBecomingFollower(boolean withKip853Rpc)
         context.assertElectedLeader(epoch, voter3);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .collect(Collectors.toList());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withKip853Rpc(withKip853Rpc)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest1.destination().id());
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest1.correlationId(),
+            fetchRequest1.destination(),
+            context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, 
Errors.BROKER_NOT_AVAILABLE)
+        );
+        context.pollUntilRequest();
+
+        // We should retry the Fetch against the other voter since the original
+        // voter connection will be backing off.
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
+        assertNotEquals(leaderId, fetchRequest2.destination().id());
+        
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+
+        Errors error = fetchRequest2.destination().id() == leaderId ?
+            Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER;
+        context.deliverResponse(
+            fetchRequest2.correlationId(),
+            fetchRequest2.destination(),
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
error)
+        );
+        context.client.poll();
+
+        context.assertElectedLeader(epoch, leaderId);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testFollowerLeaderRediscoveryAfterRequestTimeout(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .collect(Collectors.toList());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withKip853Rpc(withKip853Rpc)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest1.destination().id());
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+
+        context.time.sleep(context.requestTimeoutMs());
+        context.pollUntilRequest();
+
+        // We should retry the Fetch against the other voter since the original
+        // voter connection will be backing off.
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
+        assertNotEquals(leaderId, fetchRequest2.destination().id());
+        
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest2.correlationId(),
+            fetchRequest2.destination(),
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
Errors.FENCED_LEADER_EPOCH)

Review Comment:
   Is this an accurate error? `FENCED_LEADER_EPOCH`. Technically, a replica 
returns this error if the sent epoch is less that the epoch known by the 
receiving replica. This is not the case in the this test case since the 
returned epoch is the same as the sent epoch.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1780,6 +1780,75 @@ public void 
testUnattachedAsObserverDoesNotBecomeCandidateAfterElectionTimeout(b
         assertEquals(0, context.channel.drainSendQueue().size());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean withKip853Rpc) 
throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int leaderNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId, leaderNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(withKip853Rpc)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
0L, 0);
+        assertTrue(context.client.quorum().isUnattached());
+        assertTrue(context.client.quorum().isVoter());
+
+        // receives a fetch response specifying who the leader is
+        Errors responseError = (request.destination().id() == otherNodeId) ? 
Errors.NOT_LEADER_OR_FOLLOWER : Errors.NONE;
+        context.deliverResponse(
+            request.correlationId(),
+            request.destination(),
+            context.fetchResponse(epoch, leaderNodeId, MemoryRecords.EMPTY, 
0L, responseError)
+        );
+
+        context.client.poll();
+        assertTrue(context.client.quorum().isFollower());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testUnattachedAsVoterCanBecomeCandidate(boolean withKip853Rpc) 
throws Exception {

Review Comment:
   I would assume that we would already have a similar test. How about 
extending `testInitializeAsCandidateAndBecomeLeader` to do any additional 
checks done in this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to