mumrah commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r659989672
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2250,7 +2250,42 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { @Override public void resign(int epoch) { - throw new UnsupportedOperationException(); + if (epoch < 0) { + throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); + } + + if (!quorum.isVoter()) { + throw new IllegalArgumentException("Attempt to resign by a non-voter"); Review comment: Would IllegalStateException be better here? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) { LeaderState<T> state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); - GracefulShutdown shutdown = this.shutdown.get(); - if (shutdown != null) { + if (shutdown.get() != null || state.isResignRequested()) { Review comment: unrelated, but maybe worth creating helper method that returns `Optional<GracefulShutdown>` to avoid these null checks throughout ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2250,7 +2249,40 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { @Override public void resign(int epoch) { - throw new UnsupportedOperationException(); + if (epoch < 0) { + throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); + } + + if (!quorum.isVoter()) { + throw new IllegalArgumentException("Attempt to resign by a non-voter"); + } + + LeaderAndEpoch leaderAndEpoch = leaderAndEpoch(); + int currentEpoch = leaderAndEpoch.epoch(); + + if (epoch > currentEpoch) { + throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + + " which is larger than the current epoch " + currentEpoch); + } else if (epoch < currentEpoch) { + // If the passed epoch is smaller than the current epoch, then it might mean + // that the listener has not been notified about a leader change that already + // took place. In this case, we consider the call as already fulfilled and + // take no further action. + return; + } else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) { Review comment: Does it make sense to do this check before the epoch validation? If we're not the leader and received an old epoch (which, if i understand, seems likely if we're _not_ the leader anymore), we will silently ignore in the above case. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1914,8 +1915,7 @@ private long pollLeader(long currentTimeMs) { LeaderState<T> state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); - GracefulShutdown shutdown = this.shutdown.get(); - if (shutdown != null) { + if (shutdown.get() != null || state.isResignRequested()) { Review comment: Works for me 👍 ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2250,7 +2249,40 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { @Override public void resign(int epoch) { - throw new UnsupportedOperationException(); + if (epoch < 0) { + throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); + } + + if (!quorum.isVoter()) { + throw new IllegalArgumentException("Attempt to resign by a non-voter"); + } + + LeaderAndEpoch leaderAndEpoch = leaderAndEpoch(); + int currentEpoch = leaderAndEpoch.epoch(); + + if (epoch > currentEpoch) { + throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + + " which is larger than the current epoch " + currentEpoch); + } else if (epoch < currentEpoch) { + // If the passed epoch is smaller than the current epoch, then it might mean + // that the listener has not been notified about a leader change that already + // took place. In this case, we consider the call as already fulfilled and + // take no further action. + return; + } else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) { Review comment: I see what you mean, and yea that is a fair point 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org