fanyang89 commented on code in PR #1925:
URL: https://github.com/apache/zookeeper/pull/1925#discussion_r1157117488


##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -701,6 +712,309 @@ private void proposeSetData(QuorumPacket qp, long zxid, 
String data, int version
         });
     }
 
+    /**
+     * Tests a follower that has queued transactions in SyncRequestProcessor 
that are also already
+     * committed, with leader getting quorum for those transactions elsewhere 
in the ensemble, and
+     * then that the leader shuts down, triggering a new leader election, and 
partial resetting of
+     * state in the follower.
+     * In particular, this test was written to verify a bug where 
LearnerZooKeeperServer was not
+     * shut down, because shutdown() was erroneously called on the super class 
ZooKeeperServer,
+     * which led to its SyncRequestProcessor not being flushed during 
shutdown, and any queued
+     * transactions lost. This would only happen if the SyncRequestProcessor 
also crashed; this
+     * would happen as a consequence of the leader going down, causing the 
SendAckRequestProcessor
+     * to throw, and kill the sync thread.
+     * In the subsequent leader election, the quorum peer would use the 
committed state, even though
+     * this was not yet flushed to persistent storage, and never would be, 
after the sync thread died.
+     * If the correct server had been shut down, the queued transactions would 
instead either be
+     * flushed to persistent storage when the quorum peer shut down the old 
follower, or this would
+     * fail, causing state to be recreated from whatever state was already 
flushed, which again would
+     * be corrected in a DIFF from the new leader.
+     */
+    @Test
+    public void testFollowerWithPendingSyncsOnLeaderReElection() throws 
Exception {
+
+        CountDownLatch followerSetUp = new CountDownLatch(1);
+
+        class BlockingRequestProcessor implements RequestProcessor, Flushable {
+            final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test 
thread will register later.
+
+            final SendAckRequestProcessor nextProcessor; // 
SendAckRequestProcessor
+
+            BlockingRequestProcessor(SendAckRequestProcessor nextProcessor) {
+                this.nextProcessor = nextProcessor;
+            }
+
+            @Override
+            public void processRequest(Request request) throws 
RequestProcessorException {
+                nextProcessor.processRequest(request);
+            }
+
+            @Override
+            public void shutdown() {
+                phaser.forceTermination();
+                nextProcessor.shutdown();
+            }
+
+            @Override
+            public void flush() throws IOException {
+                phaser.arriveAndAwaitAdvance(); // Let test thread know we're 
flushing.
+                phaser.arriveAndAwaitAdvance(); // Let test thread do more 
stuff while we wait here, simulating slow fsync, etc..
+                nextProcessor.flush();
+            }
+
+        }
+
+        class BlockingFollowerZooKeeperServer extends FollowerZooKeeperServer {
+
+            BlockingRequestProcessor blocker;
+
+            BlockingFollowerZooKeeperServer(FileTxnSnapLog logFactory, 
QuorumPeer self, ZKDatabase zkDb) throws IOException {
+                super(logFactory, self, zkDb);
+            }
+
+            @Override
+            protected void setupRequestProcessors() {
+                RequestProcessor finalProcessor = new 
FinalRequestProcessor(this);
+                commitProcessor = new CommitProcessor(finalProcessor, 
Long.toString(getServerId()), true, getZooKeeperServerListener());
+                commitProcessor.start();
+                firstProcessor = new FollowerRequestProcessor(this, 
commitProcessor);
+                ((FollowerRequestProcessor) firstProcessor).start();
+                blocker = new BlockingRequestProcessor(new 
SendAckRequestProcessor(getFollower()));
+                syncProcessor = new SyncRequestProcessor(this, blocker);
+                syncProcessor.start();
+                followerSetUp.countDown();
+            }
+
+        }
+
+        File followerDir = File.createTempFile("test", "dir", testData);
+        assertTrue(followerDir.delete());
+        assertTrue(followerDir.mkdir());
+
+        File leaderDir = File.createTempFile("test", "dir", testData);
+        assertTrue(leaderDir.delete());
+        assertTrue(leaderDir.mkdir());
+
+        Thread followerThread = null;
+        ConversableFollower follower = null;
+        QuorumPeer peer = null;
+        BlockingRequestProcessor blocker = null;
+
+        try (ServerSocket ss = new ServerSocket(0, 50, 
InetAddress.getByName("127.0.0.1"))) {
+            peer = createQuorumPeer(followerDir);
+
+            FileTxnSnapLog logFactory = new FileTxnSnapLog(followerDir, 
followerDir);
+            peer.setTxnFactory(logFactory);
+            ZKDatabase zkDb = new ZKDatabase(logFactory);
+            BlockingFollowerZooKeeperServer zk = new 
BlockingFollowerZooKeeperServer(logFactory, peer, zkDb);
+            peer.setZKDatabase(zkDb);
+            follower = new ConversableFollower(peer, zk);
+            follower.setLeaderQuorumServer(new QuorumServer(1, 
(InetSocketAddress) ss.getLocalSocketAddress()));
+            peer.follower = follower;
+
+            CompletableFuture<Exception> followerExit = new 
CompletableFuture<>();
+            final Follower followerForThread = follower;
+            followerThread = new Thread(() -> {
+                try {
+                    followerForThread.followLeader();
+                    followerExit.complete(null);
+                } catch (Exception e) {
+                    LOG.warn("Unexpected exception in follower thread", e);
+                    followerExit.complete(e);
+                }
+            });
+            followerThread.start();
+
+            Socket leaderSocket = ss.accept();
+            InputArchive ia = 
BinaryInputArchive.getArchive(leaderSocket.getInputStream());
+            OutputArchive oa = 
BinaryOutputArchive.getArchive(leaderSocket.getOutputStream());
+
+            assertEquals(0, follower.self.getAcceptedEpoch());
+            assertEquals(0, follower.self.getCurrentEpoch());
+
+            // Set up a database with a single /foo node, on the leader
+            final long firstZxid = ZxidUtils.makeZxid(1, 1);
+            ZKDatabase leaderZkDb = new ZKDatabase(new 
FileTxnSnapLog(leaderDir, leaderDir));
+            leaderZkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, 
OpCode.create), new CreateTxn("/foo", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE, 
false, 1), null);
+            Stat stat = new Stat();
+            assertEquals("data1", new String(leaderZkDb.getData("/foo", stat, 
null)));
+
+            QuorumPacket qp = new QuorumPacket();
+            readPacketSkippingPing(ia, qp);
+            assertEquals(Leader.FOLLOWERINFO, qp.getType());
+            assertEquals(qp.getZxid(), 0);
+            LearnerInfo learnInfo = new LearnerInfo();
+            
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), 
learnInfo);
+            assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+            assertEquals(learnInfo.getServerid(), 0);
+
+            // We are simulating an established leader, so the epoch is 1
+            qp.setType(Leader.LEADERINFO);
+            qp.setZxid(ZxidUtils.makeZxid(1, 0));
+            byte[] protoBytes = new byte[4];
+            ByteBuffer.wrap(protoBytes).putInt(0x10000);
+            qp.setData(protoBytes);
+            oa.writeRecord(qp, null);
+
+            readPacketSkippingPing(ia, qp);
+            assertEquals(Leader.ACKEPOCH, qp.getType());
+            assertEquals(0, qp.getZxid());
+            assertEquals(ZxidUtils.makeZxid(0, 0), 
ByteBuffer.wrap(qp.getData()).getInt());
+            assertEquals(1, follower.self.getAcceptedEpoch());
+            assertEquals(0, follower.self.getCurrentEpoch());
+
+            // Send a diff with a single PROPOSAL, to be COMMITTed after 
NEWLEADER
+            qp.setType(Leader.DIFF);
+            qp.setData(new byte[0]);
+            qp.setZxid(leaderZkDb.getDataTreeLastProcessedZxid());
+            oa.writeRecord(qp, null);
+
+            long createZxid0 = ZxidUtils.makeZxid(1, 2);
+            qp.setType(Leader.PROPOSAL);
+            qp.setZxid(createZxid0);
+            TxnHeader hdr = new TxnHeader(13, 1313, createZxid0, 33, 
OpCode.create);
+            CreateTxn ct = new CreateTxn("/bar", "hi".getBytes(), 
Ids.OPEN_ACL_UNSAFE, false, 1);
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+            boa.writeRecord(hdr, null);
+            boa.writeRecord(ct, null);
+            qp.setData(baos.toByteArray());
+            oa.writeRecord(qp, null);
+
+            // Required for the ZK server to start up.
+            qp.setType(Leader.NEWLEADER);
+            qp.setZxid(ZxidUtils.makeZxid(1, 0));
+            qp.setData(null);
+            oa.writeRecord(qp, null);
+
+            // Quorum was acquired for the previous PROPOSAL, which is now 
COMMITTed.
+            qp.setType(Leader.COMMIT);
+            qp.setZxid(createZxid0);
+            oa.writeRecord(qp, null);
+
+            qp.setType(Leader.UPTODATE);
+            qp.setZxid(0);
+            oa.writeRecord(qp, null);
+
+            // Get the ACK of the new leader.
+            readPacketSkippingPing(ia, qp);
+            assertEquals(Leader.ACK, qp.getType());
+            assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+            assertEquals(1, follower.self.getAcceptedEpoch());
+            assertEquals(1, follower.self.getCurrentEpoch());
+
+            // Read the PROPOSAL ack.
+            readPacketSkippingPing(ia, qp);
+            assertEquals(Leader.ACK, qp.getType());
+            assertEquals(createZxid0, qp.getZxid());
+
+            // Read the UPTODATE ack.
+            readPacketSkippingPing(ia, qp);
+            assertEquals(Leader.ACK, qp.getType());
+            assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
+            // The follower now starts following the leader.
+            // We send a PROPOSAL and a COMMIT, and wait for the transaction 
to be flushed by SyncRequestProcessor.
+            blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker;
+            blocker.phaser.register();
+            long createZxid1 = ZxidUtils.makeZxid(1, 3);
+            qp.setType(Leader.PROPOSAL);
+            qp.setZxid(createZxid1);
+            hdr = new TxnHeader(13, 1313, createZxid1, 33, OpCode.create);
+            ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, 
false, 1);
+            baos = new ByteArrayOutputStream();
+            boa = BinaryOutputArchive.getArchive(baos);
+            boa.writeRecord(hdr, null);
+            boa.writeRecord(ct, null);
+            qp.setData(baos.toByteArray());
+            oa.writeRecord(qp, null);
+
+            qp.setType(Leader.COMMIT);
+            qp.setZxid(createZxid1);
+            oa.writeRecord(qp, null);
+
+            // Wait for "fsync" to begin.
+            assertTrue(followerSetUp.await(10, TimeUnit.SECONDS));
+            blocker.phaser.arriveAndAwaitAdvance();
+
+            // Now we send another PROPOSAL and COMMIT, and wait for them to 
be applied to the data tree.
+            // They will not be attempted flushed yet, because the ongoing 
"fsync" is slow (waiting on the phaser).
+            long createZxid2 = ZxidUtils.makeZxid(1, 4);
+            qp.setType(Leader.PROPOSAL);
+            qp.setZxid(createZxid2);
+            hdr = new TxnHeader(13, 1314, createZxid2, 34, OpCode.create);
+            ct = new CreateTxn("/baz", "bye".getBytes(), Ids.OPEN_ACL_UNSAFE, 
false, 1);
+            baos = new ByteArrayOutputStream();
+            boa = BinaryOutputArchive.getArchive(baos);
+            boa.writeRecord(hdr, null);
+            boa.writeRecord(ct, null);
+            qp.setData(baos.toByteArray());
+            oa.writeRecord(qp, null);
+
+            qp.setType(Leader.COMMIT);
+            qp.setZxid(createZxid2);
+            oa.writeRecord(qp, null);
+
+            // Wait for the follower to observe the COMMIT, and apply the 
PROPOSAL to its data tree. Unfortunately,
+            // there's nothing to do but sleep here, as watches are triggered 
before the last processed id is updated.
+            long doom = System.currentTimeMillis() + 1000;
+            while (createZxid1 != follower.fzk.getLastProcessedZxid() && 
System.currentTimeMillis() < doom) {
+                Thread.sleep(1);
+            }
+            assertEquals(createZxid1, follower.fzk.getLastProcessedZxid());

Review Comment:
   Running the unit test shows that this assertion is not always true.
   After txn(1, counter=3) is flushed, SyncRequestProcessor can take() txn(1, 
4) and add to the `toFlush` queue without a flush(), then poll() returns a 
null, and the processor flushes. Txn(1,4) may or may not flush; it depends on 
the order.
   A simple workaround is to enable flushDelay (via zookeeper.flushDelay) so 
that a flush for txn(1,4) is not called from an incoming null request. Add a 
barrier maybe?
   It's likely to happen in an earlier version of the JDK(e.g., 1.8, 10, 11, 
etc., it has yet to occur at JDK18 in my test env, but why?)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@zookeeper.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to