fanyang89 commented on code in PR #1925: URL: https://github.com/apache/zookeeper/pull/1925#discussion_r1157117488
########## zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java: ########## @@ -701,6 +712,309 @@ private void proposeSetData(QuorumPacket qp, long zxid, String data, int version }); } + /** + * Tests a follower that has queued transactions in SyncRequestProcessor that are also already + * committed, with leader getting quorum for those transactions elsewhere in the ensemble, and + * then that the leader shuts down, triggering a new leader election, and partial resetting of + * state in the follower. + * In particular, this test was written to verify a bug where LearnerZooKeeperServer was not + * shut down, because shutdown() was erroneously called on the super class ZooKeeperServer, + * which led to its SyncRequestProcessor not being flushed during shutdown, and any queued + * transactions lost. This would only happen if the SyncRequestProcessor also crashed; this + * would happen as a consequence of the leader going down, causing the SendAckRequestProcessor + * to throw, and kill the sync thread. + * In the subsequent leader election, the quorum peer would use the committed state, even though + * this was not yet flushed to persistent storage, and never would be, after the sync thread died. + * If the correct server had been shut down, the queued transactions would instead either be + * flushed to persistent storage when the quorum peer shut down the old follower, or this would + * fail, causing state to be recreated from whatever state was already flushed, which again would + * be corrected in a DIFF from the new leader. + */ + @Test + public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception { + + CountDownLatch followerSetUp = new CountDownLatch(1); + + class BlockingRequestProcessor implements RequestProcessor, Flushable { + final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test thread will register later. + + final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor + + BlockingRequestProcessor(SendAckRequestProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public void processRequest(Request request) throws RequestProcessorException { + nextProcessor.processRequest(request); + } + + @Override + public void shutdown() { + phaser.forceTermination(); + nextProcessor.shutdown(); + } + + @Override + public void flush() throws IOException { + phaser.arriveAndAwaitAdvance(); // Let test thread know we're flushing. + phaser.arriveAndAwaitAdvance(); // Let test thread do more stuff while we wait here, simulating slow fsync, etc.. + nextProcessor.flush(); + } + + } + + class BlockingFollowerZooKeeperServer extends FollowerZooKeeperServer { + + BlockingRequestProcessor blocker; + + BlockingFollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + super(logFactory, self, zkDb); + } + + @Override + protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor.start(); + firstProcessor = new FollowerRequestProcessor(this, commitProcessor); + ((FollowerRequestProcessor) firstProcessor).start(); + blocker = new BlockingRequestProcessor(new SendAckRequestProcessor(getFollower())); + syncProcessor = new SyncRequestProcessor(this, blocker); + syncProcessor.start(); + followerSetUp.countDown(); + } + + } + + File followerDir = File.createTempFile("test", "dir", testData); + assertTrue(followerDir.delete()); + assertTrue(followerDir.mkdir()); + + File leaderDir = File.createTempFile("test", "dir", testData); + assertTrue(leaderDir.delete()); + assertTrue(leaderDir.mkdir()); + + Thread followerThread = null; + ConversableFollower follower = null; + QuorumPeer peer = null; + BlockingRequestProcessor blocker = null; + + try (ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"))) { + peer = createQuorumPeer(followerDir); + + FileTxnSnapLog logFactory = new FileTxnSnapLog(followerDir, followerDir); + peer.setTxnFactory(logFactory); + ZKDatabase zkDb = new ZKDatabase(logFactory); + BlockingFollowerZooKeeperServer zk = new BlockingFollowerZooKeeperServer(logFactory, peer, zkDb); + peer.setZKDatabase(zkDb); + follower = new ConversableFollower(peer, zk); + follower.setLeaderQuorumServer(new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress())); + peer.follower = follower; + + CompletableFuture<Exception> followerExit = new CompletableFuture<>(); + final Follower followerForThread = follower; + followerThread = new Thread(() -> { + try { + followerForThread.followLeader(); + followerExit.complete(null); + } catch (Exception e) { + LOG.warn("Unexpected exception in follower thread", e); + followerExit.complete(e); + } + }); + followerThread.start(); + + Socket leaderSocket = ss.accept(); + InputArchive ia = BinaryInputArchive.getArchive(leaderSocket.getInputStream()); + OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket.getOutputStream()); + + assertEquals(0, follower.self.getAcceptedEpoch()); + assertEquals(0, follower.self.getCurrentEpoch()); + + // Set up a database with a single /foo node, on the leader + final long firstZxid = ZxidUtils.makeZxid(1, 1); + ZKDatabase leaderZkDb = new ZKDatabase(new FileTxnSnapLog(leaderDir, leaderDir)); + leaderZkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, OpCode.create), new CreateTxn("/foo", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1), null); + Stat stat = new Stat(); + assertEquals("data1", new String(leaderZkDb.getData("/foo", stat, null))); + + QuorumPacket qp = new QuorumPacket(); + readPacketSkippingPing(ia, qp); + assertEquals(Leader.FOLLOWERINFO, qp.getType()); + assertEquals(qp.getZxid(), 0); + LearnerInfo learnInfo = new LearnerInfo(); + ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); + assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getServerid(), 0); + + // We are simulating an established leader, so the epoch is 1 + qp.setType(Leader.LEADERINFO); + qp.setZxid(ZxidUtils.makeZxid(1, 0)); + byte[] protoBytes = new byte[4]; + ByteBuffer.wrap(protoBytes).putInt(0x10000); + qp.setData(protoBytes); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACKEPOCH, qp.getType()); + assertEquals(0, qp.getZxid()); + assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(0, follower.self.getCurrentEpoch()); + + // Send a diff with a single PROPOSAL, to be COMMITTed after NEWLEADER + qp.setType(Leader.DIFF); + qp.setData(new byte[0]); + qp.setZxid(leaderZkDb.getDataTreeLastProcessedZxid()); + oa.writeRecord(qp, null); + + long createZxid0 = ZxidUtils.makeZxid(1, 2); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid0); + TxnHeader hdr = new TxnHeader(13, 1313, createZxid0, 33, OpCode.create); + CreateTxn ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + + // Required for the ZK server to start up. + qp.setType(Leader.NEWLEADER); + qp.setZxid(ZxidUtils.makeZxid(1, 0)); + qp.setData(null); + oa.writeRecord(qp, null); + + // Quorum was acquired for the previous PROPOSAL, which is now COMMITTed. + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid0); + oa.writeRecord(qp, null); + + qp.setType(Leader.UPTODATE); + qp.setZxid(0); + oa.writeRecord(qp, null); + + // Get the ACK of the new leader. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(1, follower.self.getCurrentEpoch()); + + // Read the PROPOSAL ack. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createZxid0, qp.getZxid()); + + // Read the UPTODATE ack. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + + // The follower now starts following the leader. + // We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor. + blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; + blocker.phaser.register(); + long createZxid1 = ZxidUtils.makeZxid(1, 3); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid1); + hdr = new TxnHeader(13, 1313, createZxid1, 33, OpCode.create); + ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + baos = new ByteArrayOutputStream(); + boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid1); + oa.writeRecord(qp, null); + + // Wait for "fsync" to begin. + assertTrue(followerSetUp.await(10, TimeUnit.SECONDS)); + blocker.phaser.arriveAndAwaitAdvance(); + + // Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree. + // They will not be attempted flushed yet, because the ongoing "fsync" is slow (waiting on the phaser). + long createZxid2 = ZxidUtils.makeZxid(1, 4); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid2); + hdr = new TxnHeader(13, 1314, createZxid2, 34, OpCode.create); + ct = new CreateTxn("/baz", "bye".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + baos = new ByteArrayOutputStream(); + boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid2); + oa.writeRecord(qp, null); + + // Wait for the follower to observe the COMMIT, and apply the PROPOSAL to its data tree. Unfortunately, + // there's nothing to do but sleep here, as watches are triggered before the last processed id is updated. + long doom = System.currentTimeMillis() + 1000; + while (createZxid1 != follower.fzk.getLastProcessedZxid() && System.currentTimeMillis() < doom) { + Thread.sleep(1); + } + assertEquals(createZxid1, follower.fzk.getLastProcessedZxid()); Review Comment: Running the unit test shows that this assertion is not always true. After txn(1, counter=3) is flushed, SyncRequestProcessor can take() txn(1, 4) and add to the `toFlush` queue without a flush(), then poll() returns a null, and the processor flushes. Txn(1,4) may or may not flush; it depends on the order. A simple workaround is to enable flushDelay (via zookeeper.flushDelay) so that a flush for txn(1,4) is not called from an incoming null request. Add a barrier maybe? It's likely to happen in an earlier version of the JDK(e.g., 1.8, 10, 11, etc., it has yet to occur at JDK18 in my test env, but why?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@zookeeper.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org