jsancio commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r656616690
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -143,6 +144,7 @@ public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024; public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES; + private final AtomicInteger resignedEpoch = new AtomicInteger(-1); Review comment: Okay. We do similar synchronization for `append`. The `LeaderState` has an `epoch` and it is final. The part that may be tricky to implement is the `epoch < currentEpoch` case. ########## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ########## @@ -1159,18 +1164,20 @@ void readBatch(BatchReader<String> reader) { } @Override - public void handleLeaderChange(LeaderAndEpoch leader) { + public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) { // We record the next expected offset as the claimed epoch's start - // offset. This is useful to verify that the `handleClaim` callback + // offset. This is useful to verify that the `handleLeaderChange` callback // was not received early. - if (localId.isPresent() && leader.isLeader(localId.getAsInt())) { + if (localId.isPresent() && leaderAndEpoch.isLeader(localId.getAsInt())) { long claimedEpochStartOffset = lastCommitOffset().isPresent() ? lastCommitOffset().getAsLong() + 1 : 0L; - this.currentClaimedEpoch = OptionalInt.of(leader.epoch()); - this.claimedEpochStartOffsets.put(leader.epoch(), claimedEpochStartOffset); + this.currentClaimedEpoch = OptionalInt.of(leaderAndEpoch.epoch()); Review comment: Since we are storing `currentLeaderAndEpoch`, I think that we can remove the variable `currentClaimedEpoch` and add a function that returns this epoch based on the value of `currentLeaderAndEpoch`. What do you think? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1914,8 +1917,7 @@ private long pollLeader(long currentTimeMs) { LeaderState<T> state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); - GracefulShutdown shutdown = this.shutdown.get(); - if (shutdown != null) { + if (shutdown.get() != null || resignedEpoch.get() == state.epoch()) { Review comment: Got it. For future readers the function is `maybeCompleteShutdown`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org