dengziming commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r655833840



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws 
Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);
+    }
+
+    @Test
+    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws 
Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, 
remoteId1));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        context.assertElectedLeader(resignedEpoch + 1, remoteId1);
+    }
+
+    @Test
+    public void testElectionTimeoutAfterUserInitiatedResign() throws Exception 
{
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.pollUntilRequest();
+        int correlationId = 
context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId);
+
+        EndQuorumEpochResponseData response = 
EndQuorumEpochResponse.singletonResponse(
+            Errors.NONE,
+            context.metadataPartition,
+            Errors.NONE,
+            resignedEpoch,
+            localId
+        );
+
+        context.deliverResponse(correlationId, otherNodeId, response);
+        context.client.poll();
+
+        // We do not resend `EndQuorumRequest` once the other voter has 
acknowledged it.
+        context.time.sleep(context.retryBackoffMs);
+        context.client.poll();
+        assertFalse(context.channel.hasSentRequests());
+
+        // Any `Fetch` received in the resigned st
+        // ate should result in a NOT_LEADER error.
+        context.deliverRequest(context.fetchRequest(1, -1, 0, 0, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER,
+            resignedEpoch, OptionalInt.of(localId));
+
+        // After the election timer, we should become a candidate.
+        context.time.sleep(2 * context.electionTimeoutMs());
+        context.pollUntil(context.client.quorum()::isCandidate);
+        assertEquals(resignedEpoch + 1, context.currentEpoch());
+    }
+
+    @Test
+    public void testCannotResignWithLargerEpochThanCurrentEpoch() throws 
Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+        context.becomeLeader();
+
+        assertThrows(IllegalArgumentException.class, () -> {

Review comment:
       nit: unnecessary brace. 

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws 
Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);
+    }
+
+    @Test
+    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws 
Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, 
remoteId1));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        context.assertElectedLeader(resignedEpoch + 1, remoteId1);
+    }
+
+    @Test
+    public void testElectionTimeoutAfterUserInitiatedResign() throws Exception 
{
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.pollUntilRequest();
+        int correlationId = 
context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId);
+
+        EndQuorumEpochResponseData response = 
EndQuorumEpochResponse.singletonResponse(
+            Errors.NONE,
+            context.metadataPartition,
+            Errors.NONE,
+            resignedEpoch,
+            localId
+        );
+
+        context.deliverResponse(correlationId, otherNodeId, response);
+        context.client.poll();
+
+        // We do not resend `EndQuorumRequest` once the other voter has 
acknowledged it.
+        context.time.sleep(context.retryBackoffMs);
+        context.client.poll();
+        assertFalse(context.channel.hasSentRequests());
+
+        // Any `Fetch` received in the resigned st
+        // ate should result in a NOT_LEADER error.

Review comment:
       nit: unnecessary new line

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws 
Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);
+    }
+
+    @Test
+    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws 
Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, 
remoteId1));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        context.assertElectedLeader(resignedEpoch + 1, remoteId1);
+    }
+
+    @Test
+    public void testElectionTimeoutAfterUserInitiatedResign() throws Exception 
{
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.pollUntilRequest();
+        int correlationId = 
context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId);
+
+        EndQuorumEpochResponseData response = 
EndQuorumEpochResponse.singletonResponse(
+            Errors.NONE,
+            context.metadataPartition,
+            Errors.NONE,
+            resignedEpoch,
+            localId
+        );
+
+        context.deliverResponse(correlationId, otherNodeId, response);
+        context.client.poll();
+
+        // We do not resend `EndQuorumRequest` once the other voter has 
acknowledged it.
+        context.time.sleep(context.retryBackoffMs);
+        context.client.poll();
+        assertFalse(context.channel.hasSentRequests());
+
+        // Any `Fetch` received in the resigned st
+        // ate should result in a NOT_LEADER error.
+        context.deliverRequest(context.fetchRequest(1, -1, 0, 0, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER,
+            resignedEpoch, OptionalInt.of(localId));
+
+        // After the election timer, we should become a candidate.
+        context.time.sleep(2 * context.electionTimeoutMs());
+        context.pollUntil(context.client.quorum()::isCandidate);
+        assertEquals(resignedEpoch + 1, context.currentEpoch());
+    }
+
+    @Test
+    public void testCannotResignWithLargerEpochThanCurrentEpoch() throws 
Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+        context.becomeLeader();
+
+        assertThrows(IllegalArgumentException.class, () -> {
+            context.client.resign(context.currentEpoch() + 1);
+        });
+    }
+
+    @Test
+    public void testCannotResignIfNotLeader() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int leaderEpoch = 2;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(leaderEpoch, otherNodeId)
+            .build();
+
+        assertEquals(OptionalInt.of(otherNodeId), context.currentLeader());
+        assertThrows(IllegalArgumentException.class, () -> {
+            context.client.resign(leaderEpoch);
+        });
+    }
+
+    @Test
+    public void testCannotResignIfObserver() throws Exception {
+        int leaderId = 1;
+        int otherNodeId = 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(OptionalInt.empty(), voters).build();
+        context.pollUntilRequest();
+
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        assertTrue(voters.contains(fetchRequest.destinationId()));
+        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+
+        context.deliverResponse(fetchRequest.correlationId, 
fetchRequest.destinationId(),
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
Errors.FENCED_LEADER_EPOCH));
+
+        context.client.poll();
+        context.assertElectedLeader(epoch, leaderId);
+
+        assertThrows(IllegalArgumentException.class, () -> {

Review comment:
       ditto

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1914,8 +1917,7 @@ private long pollLeader(long currentTimeMs) {
         LeaderState<T> state = quorum.leaderStateOrThrow();
         maybeFireLeaderChange(state);
 
-        GracefulShutdown shutdown = this.shutdown.get();
-        if (shutdown != null) {
+        if (shutdown.get() != null || resignedEpoch.get() == state.epoch()) {

Review comment:
       It seems that we handle `resign` and `shutdown` without distinction, 
what's their difference?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to