jsancio commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r656397806



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -143,6 +144,7 @@
     public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
     public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;
 
+    private final AtomicInteger resignedEpoch = new AtomicInteger(-1);

Review comment:
       I am wondering if we should move this state to `LeaderState`. Based on 
the comments and implementation of `resign`, this operation is only valid when 
the raft client is the leader. I think the implementation in this PR is 
functionally correct but it looks like `resignedEpoch` is always increasing. 
The raft client never sets resignedEpoch back to `-1` after resigning.
   
   An additional benefit of moving it to `LeaderState` is that `resigndEpoch` 
will be reset to `-1` after it becomes leader again.

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws 
Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);

Review comment:
       Hmm. The state would never change because `poll` was never called. Maybe 
we should sleep for more than election timeout and poll to make sure that the 
resign was ignore?

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws 
Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);
+    }
+
+    @Test
+    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws 
Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, 
remoteId1));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        context.assertElectedLeader(resignedEpoch + 1, remoteId1);

Review comment:
       This comment applies to a few other places.
   
   It looks like this checks `quorumStateStore` directly. Should we check that 
`Listener.handleLeaderChange` is called or do we want to rely on other tests 
verifying that this function is called when the epoch changes?

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws 
Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);
+    }
+
+    @Test
+    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws 
Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, 
remoteId1));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        context.assertElectedLeader(resignedEpoch + 1, remoteId1);
+    }
+
+    @Test
+    public void testElectionTimeoutAfterUserInitiatedResign() throws Exception 
{
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.pollUntilRequest();
+        int correlationId = 
context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId);
+
+        EndQuorumEpochResponseData response = 
EndQuorumEpochResponse.singletonResponse(
+            Errors.NONE,
+            context.metadataPartition,
+            Errors.NONE,
+            resignedEpoch,
+            localId
+        );
+
+        context.deliverResponse(correlationId, otherNodeId, response);
+        context.client.poll();
+
+        // We do not resend `EndQuorumRequest` once the other voter has 
acknowledged it.
+        context.time.sleep(context.retryBackoffMs);
+        context.client.poll();
+        assertFalse(context.channel.hasSentRequests());
+
+        // Any `Fetch` received in the resigned st
+        // ate should result in a NOT_LEADER error.

Review comment:
       Let's move at least "ate" to the line above so that the word "state" is 
not split over two lines

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -171,9 +171,9 @@ default void beginShutdown() {}
     CompletableFuture<Void> shutdown(int timeoutMs);
 
     /**
-     * Resign the leadership. The leader will give up its leadership in the 
current epoch,
-     * and a new election will be held. Note that nothing prevents this leader 
from getting
-     * reelected.
+     * Resign the leadership. The leader will give up its leadership in the 
passed epoch
+     * (if it matches the current epoch), and a new election will be held. 
Note that nothing
+     * prevents this node from being reelected as the leader.

Review comment:
       Let's add a comment that the listener can learn about the "success" of 
this operation through `handleLeaderChange`

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1914,8 +1917,7 @@ private long pollLeader(long currentTimeMs) {
         LeaderState<T> state = quorum.leaderStateOrThrow();
         maybeFireLeaderChange(state);
 
-        GracefulShutdown shutdown = this.shutdown.get();
-        if (shutdown != null) {
+        if (shutdown.get() != null || resignedEpoch.get() == state.epoch()) {

Review comment:
       Adding this comment here but it applies to `pollUnattached` and 
`pollFollower`. This is a little unrelated but it looks like in the observer 
case we never compare the shutdown variable.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2250,7 +2252,34 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
 
     @Override
     public void resign(int epoch) {
-        throw new UnsupportedOperationException();
+        if (epoch < 0) {

Review comment:
       `0` is also an invalid epoch, right? Or does the raft client send 
`handleLeaderChange(LeaderAndEpoch(OptionalInt.empty(), 0))` to the listener?
   
   I still think it is fair to say that the raft client will never send 
`handleLeaderChange(LeaderAndEpoch(OptionalInt.of(...), 0))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to