Author: mahadev Date: Fri Nov 20 22:24:36 2009 New Revision: 882739 URL: http://svn.apache.org/viewvc?rev=882739&view=rev Log: ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created outside of normal processing (ben reed and mahadev via mahadev)
Added: hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java Modified: hadoop/zookeeper/branches/branch-3.1/CHANGES.txt hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java Modified: hadoop/zookeeper/branches/branch-3.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/CHANGES.txt?rev=882739&r1=882738&r2=882739&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.1/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.1/CHANGES.txt Fri Nov 20 22:24:36 2009 @@ -7,6 +7,8 @@ ZOOKEEPER-562. c client can flood server with pings if tcp send queue filled. (ben reed via mahadev) + ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created + outside of normal processing (ben reed and mahadev via mahadev) Release 3.1.1 - 2009-03-17 Modified: hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=882739&r1=882738&r2=882739&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original) +++ hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Fri Nov 20 22:24:36 2009 @@ -172,7 +172,16 @@ writePacket(qp); readPacket(qp); long newLeaderZxid = qp.getZxid(); - + + //check to see if the leader zxid is lower than ours + //this should never happen but is just a safety check + long lastLoggedZxid = sentLastZxid; + if ((newLeaderZxid >> 32L) < (lastLoggedZxid >> 32L)) { + LOG.fatal("Leader epoch " + Long.toHexString(newLeaderZxid >> 32L) + + " is less than our zxid " + Long.toHexString(lastLoggedZxid >> 32L)); + throw new IOException("Error: Epoch of leader is lower"); + } + if (qp.getType() != Leader.NEWLEADER) { LOG.error("First packet should have been NEWLEADER"); throw new IOException("First packet should have been NEWLEADER"); Modified: hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=882739&r1=882738&r2=882739&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Nov 20 22:24:36 2009 @@ -34,6 +34,7 @@ import org.apache.zookeeper.server.NIOServerCnxn; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.persistence.Util; /** * This class manages the quorum protocol. There are three states this server @@ -327,9 +328,35 @@ new NIOServerCnxn.Factory(clientPort)); } - public long getLastLoggedZxid(){ - return logFactory.getLastLoggedZxid(); + /** + * returns the highest zxid that this host has seen + * + * @return the highest zxid for this host + */ + public long getLastLoggedZxid() { + /* + * it is possible to have the last zxid with just a snapshot and no log + * related to it. one example is during upgrade wherein the there is no + * corresponding log to the snapshot. in that case just use the snapshot + * zxid + */ + + File lastSnapshot = null; + long maxZxid = -1L; + long maxLogZxid = logFactory.getLastLoggedZxid(); + try { + lastSnapshot = logFactory.findMostRecentSnapshot(); + if (lastSnapshot != null) { + maxZxid = Math.max(Util.getZxidFromName(lastSnapshot.getName(), + "snapshot"), maxLogZxid); + } + } catch (IOException ie) { + LOG.warn("Exception finding last snapshot ", ie); + maxZxid = maxLogZxid; + } + return maxZxid; } + public Follower follower; public Leader leader; Modified: hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=882739&r1=882738&r2=882739&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java Fri Nov 20 22:24:36 2009 @@ -140,6 +140,8 @@ public static boolean waitForServerUp(String hp, long timeout) { long start = System.currentTimeMillis(); + // if there are multiple host ports just take the first one + hp = hp.split(",")[0]; String split[] = hp.split(":"); String host = split[0]; int port = Integer.parseInt(split[1]); Modified: hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=882739&r1=882738&r2=882739&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original) +++ hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java Fri Nov 20 22:24:36 2009 @@ -99,16 +99,50 @@ LOG.info(hp + " is accepting client connections"); } } + + public void setUpServers() throws Exception { + int tickTime = 2000; + int initLimit = 3; + int syncLimit = 3; + + HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>(); + peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", 3181))); + peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", 3182))); + peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", 3183))); + peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", 3184))); + peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", 3185))); + + LOG.info("creating QuorumPeer 1"); + s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0, 1, tickTime, initLimit, syncLimit); + assertEquals(2181, s1.getClientPort()); + LOG.info("creating QuorumPeer 2"); + s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 2, tickTime, initLimit, syncLimit); + assertEquals(2182, s2.getClientPort()); + LOG.info("creating QuorumPeer 3"); + s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 3, tickTime, initLimit, syncLimit); + assertEquals(2183, s3.getClientPort()); + LOG.info("creating QuorumPeer 4"); + s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 4, tickTime, initLimit, syncLimit); + assertEquals(2184, s4.getClientPort()); + LOG.info("creating QuorumPeer 5"); + s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 5, tickTime, initLimit, syncLimit); + assertEquals(2185, s5.getClientPort()); + } - @After - @Override - protected void tearDown() throws Exception { - LOG.info("TearDown started"); + public void shutdownServers() throws Exception { shutdown(s1); shutdown(s2); shutdown(s3); shutdown(s4); shutdown(s5); + } + + + @After + @Override + protected void tearDown() throws Exception { + LOG.info("TearDown started"); + shutdownServers(); for (String hp : hostPort.split(",")) { assertTrue("waiting for server down", Added: hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java?rev=882739&view=auto ============================================================================== --- hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java (added) +++ hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java Fri Nov 20 22:24:36 2009 @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; + +import junit.framework.TestCase; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test what happens when logs fall behind the snapshots or a follower has a + * higher epoch than a leader. + */ +public class QuorumZxidSyncTest extends TestCase { + QuorumBase qb = new QuorumBase(); + + @Before + @Override + protected void setUp() throws Exception { + qb.setUp(); + } + + @Test + /** + * find out what happens when a follower connects to leader that is behind + */ + public void testBehindLeader() throws Exception { + // crank up the epoch numbers + ClientBase.waitForServerUp(qb.hostPort, 10000); + ClientBase.waitForServerUp(qb.hostPort, 10000); + ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + qb.tearDown(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + qb.shutdownServers(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.shutdownServers(); + deleteFiles(qb.s1dir); + deleteFiles(qb.s2dir); + deleteFiles(qb.s3dir); + deleteFiles(qb.s4dir); + qb.setUpServers(); + qb.s1.start(); + qb.s2.start(); + qb.s3.start(); + qb.s4.start(); + assertTrue("Servers didn't come up", ClientBase.waitForServerUp(qb.hostPort, 10000)); + qb.s5.start(); + String hostPort = "127.0.0.1:" + qb.s5.getClientPort(); + assertFalse("Servers came up, but shouldn't have since it's ahead of leader", + ClientBase.waitForServerUp(hostPort, 10000)); + } + + + private void deleteFiles(File f) { + File v = new File(f, "version-2"); + for(File c: v.listFiles()) { + c.delete(); + } + } + + @Test + /** + * find out what happens when the latest state is in the snapshots not + * the logs. + */ + public void testLateLogs() throws Exception { + // crank up the epoch numbers + ClientBase.waitForServerUp(qb.hostPort, 10000); + ClientBase.waitForServerUp(qb.hostPort, 10000); + ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.tearDown(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + qb.tearDown(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.tearDown(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + qb.tearDown(); + deleteLogs(qb.s1dir); + deleteLogs(qb.s2dir); + deleteLogs(qb.s3dir); + deleteLogs(qb.s4dir); + deleteLogs(qb.s5dir); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + qb.tearDown(); + qb.startServers(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + boolean saw2 = false; + for(String child: zk.getChildren("/", false)) { + if (child.equals("2")) { + saw2 = true; + } + } + zk.close(); + assertTrue("Didn't see /2 (went back in time)", saw2); + } + + private void deleteLogs(File f) { + File v = new File(f, "version-2"); + for(File c: v.listFiles()) { + if (c.getName().startsWith("log")) { + c.delete(); + } + } + } + + @After + @Override + public void tearDown() throws Exception { + qb.tearDown(); + } +}