fanyang89 commented on code in PR #1925:
URL: https://github.com/apache/zookeeper/pull/1925#discussion_r1157117488
##
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##
@@ -701,6 +712,309 @@ private void proposeSetData(QuorumPacket qp, long zxid,
String data, int version
});
}
+/**
+ * Tests a follower that has queued transactions in SyncRequestProcessor
that are also already
+ * committed, with leader getting quorum for those transactions elsewhere
in the ensemble, and
+ * then that the leader shuts down, triggering a new leader election, and
partial resetting of
+ * state in the follower.
+ * In particular, this test was written to verify a bug where
LearnerZooKeeperServer was not
+ * shut down, because shutdown() was erroneously called on the super class
ZooKeeperServer,
+ * which led to its SyncRequestProcessor not being flushed during
shutdown, and any queued
+ * transactions lost. This would only happen if the SyncRequestProcessor
also crashed; this
+ * would happen as a consequence of the leader going down, causing the
SendAckRequestProcessor
+ * to throw, and kill the sync thread.
+ * In the subsequent leader election, the quorum peer would use the
committed state, even though
+ * this was not yet flushed to persistent storage, and never would be,
after the sync thread died.
+ * If the correct server had been shut down, the queued transactions would
instead either be
+ * flushed to persistent storage when the quorum peer shut down the old
follower, or this would
+ * fail, causing state to be recreated from whatever state was already
flushed, which again would
+ * be corrected in a DIFF from the new leader.
+ */
+@Test
+public void testFollowerWithPendingSyncsOnLeaderReElection() throws
Exception {
+
+CountDownLatch followerSetUp = new CountDownLatch(1);
+
+class BlockingRequestProcessor implements RequestProcessor, Flushable {
+final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test
thread will register later.
+
+final SendAckRequestProcessor nextProcessor; //
SendAckRequestProcessor
+
+BlockingRequestProcessor(SendAckRequestProcessor nextProcessor) {
+this.nextProcessor = nextProcessor;
+}
+
+@Override
+public void processRequest(Request request) throws
RequestProcessorException {
+nextProcessor.processRequest(request);
+}
+
+@Override
+public void shutdown() {
+phaser.forceTermination();
+nextProcessor.shutdown();
+}
+
+@Override
+public void flush() throws IOException {
+phaser.arriveAndAwaitAdvance(); // Let test thread know we're
flushing.
+phaser.arriveAndAwaitAdvance(); // Let test thread do more
stuff while we wait here, simulating slow fsync, etc..
+nextProcessor.flush();
+}
+
+}
+
+class BlockingFollowerZooKeeperServer extends FollowerZooKeeperServer {
+
+BlockingRequestProcessor blocker;
+
+BlockingFollowerZooKeeperServer(FileTxnSnapLog logFactory,
QuorumPeer self, ZKDatabase zkDb) throws IOException {
+super(logFactory, self, zkDb);
+}
+
+@Override
+protected void setupRequestProcessors() {
+RequestProcessor finalProcessor = new
FinalRequestProcessor(this);
+commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
+commitProcessor.start();
+firstProcessor = new FollowerRequestProcessor(this,
commitProcessor);
+((FollowerRequestProcessor) firstProcessor).start();
+blocker = new BlockingRequestProcessor(new
SendAckRequestProcessor(getFollower()));
+syncProcessor = new SyncRequestProcessor(this, blocker);
+syncProcessor.start();
+followerSetUp.countDown();
+}
+
+}
+
+File followerDir = File.createTempFile("test", "dir", testData);
+assertTrue(followerDir.delete());
+assertTrue(followerDir.mkdir());
+
+File leaderDir = File.createTempFile("test", "dir", testData);
+assertTrue(leaderDir.delete());
+assertTrue(leaderDir.mkdir());
+
+Thread followerThread = null;
+ConversableFollower follower = null;
+QuorumPeer peer = null;
+BlockingRequestProcessor blocker = null;
+
+try (ServerSocket ss = new ServerSocket(0, 50,
InetAddress.getByName("127.0.0.1"))) {
+peer = createQuorumPeer(followerDir);
+
+FileTxnSnapLog logFactory = new FileTxnSnapLog(followerDir,
followerDir);
+