hachikuji commented on a change in pull request #10913: URL: https://github.com/apache/kafka/pull/10913#discussion_r656424358
########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ########## @@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws Exception { assertFalse(context.client.isShuttingDown()); } + @Test + public void testResignInOlderEpochIgnored() throws Exception { + int localId = 0; + int otherNodeId = 1; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); + + context.becomeLeader(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + int currentEpoch = context.currentEpoch(); + context.client.resign(currentEpoch - 1); + context.assertElectedLeader(currentEpoch, localId); + } + + @Test + public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exception { + int localId = 0; + int remoteId1 = 1; + int remoteId2 = 2; + Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); + + context.becomeLeader(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + int resignedEpoch = context.currentEpoch(); + + context.client.resign(resignedEpoch); + context.pollUntil(context.client.quorum()::isResigned); + + context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, remoteId1)); + context.pollUntilResponse(); + context.assertSentBeginQuorumEpochResponse(Errors.NONE); + context.assertElectedLeader(resignedEpoch + 1, remoteId1); + } + + @Test + public void testElectionTimeoutAfterUserInitiatedResign() throws Exception { + int localId = 0; + int otherNodeId = 1; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); + + context.becomeLeader(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + int resignedEpoch = context.currentEpoch(); + + context.client.resign(resignedEpoch); + context.pollUntil(context.client.quorum()::isResigned); + + context.pollUntilRequest(); + int correlationId = context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId); + + EndQuorumEpochResponseData response = EndQuorumEpochResponse.singletonResponse( + Errors.NONE, + context.metadataPartition, + Errors.NONE, + resignedEpoch, + localId + ); + + context.deliverResponse(correlationId, otherNodeId, response); + context.client.poll(); + + // We do not resend `EndQuorumRequest` once the other voter has acknowledged it. + context.time.sleep(context.retryBackoffMs); + context.client.poll(); + assertFalse(context.channel.hasSentRequests()); + + // Any `Fetch` received in the resigned st + // ate should result in a NOT_LEADER error. Review comment: yep. errant return snuck in there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org