Author: breed Date: Fri Jul 23 06:31:57 2010 New Revision: 966984 URL: http://svn.apache.org/viewvc?rev=966984&view=rev Log: ZOOKEEPER-790. Last processed zxid set prematurely while establishing leadership (fpj via breed)
Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=966984&r1=966983&r2=966984&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Fri Jul 23 06:31:57 2010 @@ -58,6 +58,8 @@ BUGFIXES: ZOOKEEPER-766. forrest recipes docs don't mention the lock/queue recipe implementations available in the release (phunt via mahadev) + ZOOKEEPER-790. Last processed zxid set prematurely while establishing leadership (fpj via breed) + Release 3.3.0 - 2010-03-24 Non-backward compatible changes: Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=966984&r1=966983&r2=966984&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Fri Jul 23 06:31:57 2010 @@ -193,7 +193,12 @@ public class FileTxnLog implements TxnLo + hdr.getType()); } if (logStream==null) { - logFileWrite = new File(logDir, ("log." + + if(LOG.isInfoEnabled()){ + LOG.info("Creating new log file: log." + + Long.toHexString(hdr.getZxid())); + } + + logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=966984&r1=966983&r2=966984&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Fri Jul 23 06:31:57 2010 @@ -274,16 +274,15 @@ public class Leader { try { self.tick = 0; zk.loadData(); - zk.startup(); + long epoch = self.getLastLoggedZxid() >> 32L; epoch++; zk.setZxid(epoch << 32L); - zk.getZKDatabase().setlastProcessedZxid(zk.getZxid()); synchronized(this){ lastProposed = zk.getZxid(); } - + newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); @@ -327,6 +326,13 @@ public class Leader { Thread.sleep(self.tickTime); self.tick++; } + + if(LOG.isInfoEnabled()){ + LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid()); + } + zk.startup(); + zk.getZKDatabase().setlastProcessedZxid(zk.getZxid()); + if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) { self.cnxnFactory.setZooKeeperServer(zk); } @@ -466,7 +472,7 @@ public class Leader { LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid) + " is " + p.ackSet.size()); } - if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ + if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ if (zxid != lastCommitted+1) { LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid) + " from " + followerAddr + " not first!"); @@ -603,6 +609,16 @@ public class Leader { long lastProposed; + + /** + * Returns the current epoch of the leader. + * + * @return + */ + public long getEpoch(){ + return lastProposed >> 32L; + } + /** * create a proposal and send it out to all the members * Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=966984&r1=966983&r2=966984&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Fri Jul 23 06:31:57 2010 @@ -311,6 +311,9 @@ public class Learner { System.exit(13); } + if(LOG.isInfoEnabled()){ + LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >> 32L)); + } zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid); } ack.setZxid(newLeaderZxid & ~0xffffffffL); Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=966984&r1=966983&r2=966984&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java Fri Jul 23 06:31:57 2010 @@ -211,48 +211,72 @@ public class QuorumBase extends ClientBa } JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()])); } - public void setupServers() throws IOException { + + public void setupServers() throws IOException { + setupServer(1); + setupServer(2); + setupServer(3); + setupServer(4); + setupServer(5); + } + + HashMap<Long,QuorumServer> peers = null; + public void setupServer(int i) throws IOException { int tickTime = 2000; int initLimit = 3; int syncLimit = 3; - HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>(); + + if(peers == null){ + peers = new HashMap<Long,QuorumServer>(); - peers.put(Long.valueOf(1), new QuorumServer(1, + peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", port1 + 1000), new InetSocketAddress("127.0.0.1", portLE1 + 1000), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(2), new QuorumServer(2, + peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", port2 + 1000), new InetSocketAddress("127.0.0.1", portLE2 + 1000), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(3), new QuorumServer(3, + peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", port3 + 1000), new InetSocketAddress("127.0.0.1", portLE3 + 1000), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(4), new QuorumServer(4, + peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", port4 + 1000), new InetSocketAddress("127.0.0.1", portLE4 + 1000), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(5), new QuorumServer(5, + peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", port5 + 1000), new InetSocketAddress("127.0.0.1", portLE5 + 1000), LearnerType.PARTICIPANT)); + } - LOG.info("creating QuorumPeer 1 port " + port1); - s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit); - assertEquals(port1, s1.getClientPort()); - LOG.info("creating QuorumPeer 2 port " + port2); - s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit); - assertEquals(port2, s2.getClientPort()); - LOG.info("creating QuorumPeer 3 port " + port3); - s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit); - assertEquals(port3, s3.getClientPort()); - LOG.info("creating QuorumPeer 4 port " + port4); - s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit); - assertEquals(port4, s4.getClientPort()); - LOG.info("creating QuorumPeer 5 port " + port5); - s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit); - assertEquals(port5, s5.getClientPort()); + switch(i){ + case 1: + LOG.info("creating QuorumPeer 1 port " + port1); + s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit); + assertEquals(port1, s1.getClientPort()); + break; + case 2: + LOG.info("creating QuorumPeer 2 port " + port2); + s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit); + assertEquals(port2, s2.getClientPort()); + break; + case 3: + LOG.info("creating QuorumPeer 3 port " + port3); + s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit); + assertEquals(port3, s3.getClientPort()); + break; + case 4: + LOG.info("creating QuorumPeer 4 port " + port4); + s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit); + assertEquals(port4, s4.getClientPort()); + break; + case 5: + LOG.info("creating QuorumPeer 5 port " + port5); + s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit); + assertEquals(port5, s5.getClientPort()); + } } @After Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=966984&r1=966983&r2=966984&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java Fri Jul 23 06:31:57 2010 @@ -19,6 +19,9 @@ package org.apache.zookeeper.test; import java.io.IOException; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Semaphore; + import org.apache.log4j.Logger; import org.apache.zookeeper.AsyncCallback; @@ -260,5 +263,89 @@ public class QuorumTest extends QuorumBa } zk.close(); } + + /** + * Tests if closeSession can be logged before a leader gets established, which + * could lead to a locked-out follower (see ZOOKEEPER-790). + * + * The test works as follows. It has a client connecting to a follower f and + * sending batches of 1,000 updates. The goal is that f has a zxid higher than + * all other servers in the initial leader election. This way we can crash and + * recover the follower so that the follower believes it is the leader once it + * recovers (LE optimization: once a server receives a message from all other + * servers, it picks a leader. + * + * It also makes the session timeout very short so that we force the false + * leader to close the session and write it to the log in the buggy code (before + * ZOOKEEPER-790). Once f drops leadership and finds the current leader, its epoch + * is higher, and it rejects the leader. Now, if we prevent the leader from closing + * the session by only starting up (see Leader.lead()) once it obtains a quorum of + * supporters, then f will find the current leader and support it because it won't + * have a highe epoch. + * + */ + @Test + public void testNoLogBeforeLeaderEstablishment () + throws IOException, InterruptedException, KeeperException{ + final Semaphore sem = new Semaphore(0); + + Leader leader = qb.s1.leader; + if (leader == null) leader = qb.s2.leader; + if (leader == null) leader = qb.s3.leader; + if (leader == null) leader = qb.s4.leader; + if (leader == null) leader = qb.s5.leader; + + assertNotNull(leader); + + int serverPort = qb.s1.getClientPort(); + if(qb.s1.leader != null){ + serverPort = qb.s2.getClientPort(); + } + + ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + serverPort, 1000, new Watcher() { + public void process(WatchedEvent event) { + }}); + + zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + for(int i = 0; i < 50000; i++) { + zk.setData("/blah", new byte[0], -1, new AsyncCallback.StatCallback() { + public void processResult(int rc, String path, Object ctx, + Stat stat) { + counter++; + if (rc != 0) { + errors++; + } + if(counter == 20000){ + sem.release(); + } + } + }, null); + + if(i == 5000){ + qb.shutdown(qb.s1); + LOG.info("Shutting down s1"); + } + if(i == 12000){ + qb.setupServer(1); + qb.s1.start(); + LOG.info("Setting up s1"); + } + if((i % 1000) == 0){ + Thread.sleep(500); + } + } + + // Wait until all updates return + sem.tryAcquire(15000, TimeUnit.MILLISECONDS); + + // Verify that server is following and has the same epoch as the leader + assertTrue("Not following", qb.s1.follower != null); + long epochF = (qb.s1.getActiveServer().getZxid() >> 32L); + long epochL = (leader.getEpoch() >> 32L); + assertTrue("Zxid: " + qb.s1.getActiveServer().getZxid() + + "Current epoch: " + epochF, epochF == epochL); + + } // skip superhammer and clientcleanup as they are too expensive for quorum }