hachikuji commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r660007360



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) {
         LeaderState<T> state = quorum.leaderStateOrThrow();
         maybeFireLeaderChange(state);
 
-        GracefulShutdown shutdown = this.shutdown.get();
-        if (shutdown != null) {
+        if (shutdown.get() != null || state.isResignRequested()) {

Review comment:
       That's fair. Let me see if it's reasonable to do this here or if we 
should push to a separate PR.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2250,7 +2249,40 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
 
     @Override
     public void resign(int epoch) {
-        throw new UnsupportedOperationException();
+        if (epoch < 0) {
+            throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+        }
+
+        if (!quorum.isVoter()) {
+            throw new IllegalArgumentException("Attempt to resign by a 
non-voter");
+        }
+
+        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+        int currentEpoch = leaderAndEpoch.epoch();
+
+        if (epoch > currentEpoch) {
+            throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+                " which is larger than the current epoch " + currentEpoch);
+        } else if (epoch < currentEpoch) {
+            // If the passed epoch is smaller than the current epoch, then it 
might mean
+            // that the listener has not been notified about a leader change 
that already
+            // took place. In this case, we consider the call as already 
fulfilled and
+            // take no further action.
+            return;
+        } else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) {

Review comment:
       If the epoch has moved on, then the leader check is likely to fail, so 
the current order seems to make sense. We don't keep a history of previous 
states, so I think the best we can do is catch cases where the passed epoch 
does not make sense with the current state.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) {
         LeaderState<T> state = quorum.leaderStateOrThrow();
         maybeFireLeaderChange(state);
 
-        GracefulShutdown shutdown = this.shutdown.get();
-        if (shutdown != null) {
+        if (shutdown.get() != null || state.isResignRequested()) {

Review comment:
       If you don't mind, let's do this refactor separately. There are a fair 
number of uses that would benefit from having `Optional<GracefulShutdown>`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to