vamossagar12 commented on a change in pull request #10468:
URL: https://github.com/apache/kafka/pull/10468#discussion_r616018841



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1673,6 +1673,69 @@ public void testLeaderGracefulShutdownTimeout() throws 
Exception {
         assertFutureThrows(shutdownFuture, TimeoutException.class);
     }
 
+    @Test
+    public void testLeaderGracefulShutdownOnClose() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int lingerMs = 50;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withAppendLingerMs(lingerMs)
+            .build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+        assertEquals(1L, context.log.endOffset().offset);
+
+        int epoch = context.currentEpoch();
+        assertEquals(1L, context.client.scheduleAppend(epoch, 
singletonList("a")));
+
+        context.client.poll();
+        assertEquals(OptionalLong.of(lingerMs), 
context.messageQueue.lastPollTimeoutMs());
+
+        context.time.sleep(20);
+
+        // client closed now.
+        context.client.close();
+
+        // Flag for accepting appends should be toggled to false.
+        assertFalse(context.client.canAcceptAppends());
+
+        // acceptAppends flag set to false so no writes should be accepted by 
the Leader now.
+        assertNull(context.client.scheduleAppend(epoch, singletonList("b")));
+
+        // The leader should trigger a flush for whatever batches are present 
in the BatchAccumulator
+        assertEquals(2L, context.log.endOffset().offset);
+
+        // Now shutdown
+
+        // We should still be running until we have had a chance to send 
EndQuorumEpoch
+        assertTrue(context.client.isShuttingDown());
+        assertTrue(context.client.isRunning());
+
+        // Send EndQuorumEpoch request to the other voter
+        context.pollUntilRequest();
+        assertTrue(context.client.isShuttingDown());
+        assertTrue(context.client.isRunning());
+        context.assertSentEndQuorumEpochRequest(1, otherNodeId);
+
+        // We should still be able to handle vote requests during graceful 
shutdown
+        // in order to help the new leader get elected
+        context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, 
epoch, 1L));
+        context.client.poll();
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
+

Review comment:
       no problem @jsancio  plz review whenever you get the chance :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to