Author: mahadev
Date: Fri Nov 20 22:27:29 2009
New Revision: 882742

URL: http://svn.apache.org/viewvc?rev=882742&view=rev
Log:
ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created 
outside of normal processing (ben reed and mahadev via mahadev)

Added:
    
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
Modified:
    hadoop/zookeeper/branches/branch-3.2/CHANGES.txt
    
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
    
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumBase.java

Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/CHANGES.txt?rev=882742&r1=882741&r2=882742&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.2/CHANGES.txt Fri Nov 20 22:27:29 2009
@@ -28,10 +28,13 @@
   (mahadev via breed)
 
   ZOOKEEPER-585. Update README for zkpython in 3.2.2 (henry robinson via phunt)
-
+  
   ZOOKEEPER-576. docs need to be updated for session moved exception and how
   to handle it (breed via phunt)
 
+  ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created
+  outside of normal processing (ben reed and mahadev via mahadev)
+
 Release 3.2.1 - 2009-08-27
 
 Backward compatible changes:

Modified: 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=882742&r1=882741&r2=882742&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
 Fri Nov 20 22:27:29 2009
@@ -192,7 +192,16 @@
                 writePacket(qp, true);
                 readPacket(qp);
                 long newLeaderZxid = qp.getZxid();
-    
+                
+                //check to see if the leader zxid is lower than ours
+                //this should never happen but is just a safety check
+                long lastLoggedZxid = sentLastZxid;
+                if ((newLeaderZxid >> 32L) < (lastLoggedZxid >> 32L)) {
+                    LOG.fatal("Leader epoch " + Long.toHexString(newLeaderZxid 
>> 32L)
+                            + " is less than our zxid " + 
Long.toHexString(lastLoggedZxid >> 32L));
+                    throw new IOException("Error: Epoch of leader is lower");
+                }
+
                 if (qp.getType() != Leader.NEWLEADER) {
                     LOG.error("First packet should have been NEWLEADER");
                     throw new IOException("First packet should have been 
NEWLEADER");

Modified: 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=882742&r1=882741&r2=882742&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
 Fri Nov 20 22:27:29 2009
@@ -34,6 +34,7 @@
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 
@@ -375,9 +376,35 @@
                 new NIOServerCnxn.Factory(clientPort), quorumConfig);
     }
     
-    public long getLastLoggedZxid(){
-        return logFactory.getLastLoggedZxid();
+    /**
+     * returns the highest zxid that this host has seen
+     * 
+     * @return the highest zxid for this host
+     */
+    public long getLastLoggedZxid() {
+        /*
+         * it is possible to have the last zxid with just a snapshot and no log
+         * related to it. one example is during upgrade wherein the there is no
+         * corresponding log to the snapshot. in that case just use the 
snapshot
+         * zxid
+         */
+
+        File lastSnapshot = null;
+        long maxZxid = -1L;
+        long maxLogZxid = logFactory.getLastLoggedZxid();
+        try {
+            lastSnapshot = logFactory.findMostRecentSnapshot();
+            if (lastSnapshot != null) {
+                maxZxid = Math.max(Util.getZxidFromName(lastSnapshot.getName(),
+                        "snapshot"), maxLogZxid);
+            }
+        } catch (IOException ie) {
+            LOG.warn("Exception finding last snapshot ", ie);
+            maxZxid = maxLogZxid;
+        }
+        return maxZxid;
     }
+    
     public Follower follower;
     public Leader leader;
 

Modified: 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=882742&r1=882741&r2=882742&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
 Fri Nov 20 22:27:29 2009
@@ -155,6 +155,8 @@
 
     public static boolean waitForServerUp(String hp, long timeout) {
         long start = System.currentTimeMillis();
+        // if there are multiple host ports just take the first one
+        hp = hp.split(",")[0];
         String split[] = hp.split(":");
         String host = split[0];
         int port = Integer.parseInt(split[1]);

Modified: 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=882742&r1=882741&r2=882742&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumBase.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumBase.java
 Fri Nov 20 22:27:29 2009
@@ -36,13 +36,12 @@
 
     File s1dir, s2dir, s3dir, s4dir, s5dir;
     QuorumPeer s1, s2, s3, s4, s5;
-
+    
     protected void setUp() throws Exception {
         LOG.info("STARTING " + getName());
         setupTestEnv();
 
-        JMXEnv.setUp();
-
+        
         hostPort = 
"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
 
         s1dir = ClientBase.createTmpDir();
@@ -55,10 +54,13 @@
 
         LOG.info("Setup finished");
     }
+    
     void startServers() throws Exception {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+        JMXEnv.setUp();
+                
         HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
         peers.put(Long.valueOf(1), new QuorumServer(1, new 
InetSocketAddress("127.0.0.1", 3181)));
         peers.put(Long.valueOf(2), new QuorumServer(2, new 
InetSocketAddress("127.0.0.1", 3182)));
@@ -92,13 +94,13 @@
         LOG.info("start QuorumPeer 5");
         s5.start();
         LOG.info("started QuorumPeer 5");
-
         for (String hp : hostPort.split(",")) {
             assertTrue("waiting for server up",
                        ClientBase.waitForServerUp(hp,
-                                    CONNECTION_TIMEOUT));
+                                                  CONNECTION_TIMEOUT));
             LOG.info(hp + " is accepting client connections");
         }
+        
 
         // interesting to see what's there...
         JMXEnv.dump();
@@ -122,16 +124,50 @@
         }
         JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
     }
+    
+    public void setUpServers() throws Exception {
+        int tickTime = 2000;
+        int initLimit = 3;
+        int syncLimit = 3;
+        
+        HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+        peers.put(Long.valueOf(1), new QuorumServer(1, new 
InetSocketAddress("127.0.0.1", 3181)));
+        peers.put(Long.valueOf(2), new QuorumServer(2, new 
InetSocketAddress("127.0.0.1", 3182)));
+        peers.put(Long.valueOf(3), new QuorumServer(3, new 
InetSocketAddress("127.0.0.1", 3183)));
+        peers.put(Long.valueOf(4), new QuorumServer(4, new 
InetSocketAddress("127.0.0.1", 3184)));
+        peers.put(Long.valueOf(5), new QuorumServer(5, new 
InetSocketAddress("127.0.0.1", 3185)));
 
-    @After
-    @Override
-    protected void tearDown() throws Exception {
-        LOG.info("TearDown started");
+        LOG.info("creating QuorumPeer 1");
+        s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0, 1, tickTime, 
initLimit, syncLimit);
+        assertEquals(2181, s1.getClientPort());
+        LOG.info("creating QuorumPeer 2");
+        s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 2, tickTime, 
initLimit, syncLimit);
+        assertEquals(2182, s2.getClientPort());
+        LOG.info("creating QuorumPeer 3");
+        s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 3, tickTime, 
initLimit, syncLimit);
+        assertEquals(2183, s3.getClientPort());
+        LOG.info("creating QuorumPeer 4");
+        s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 4, tickTime, 
initLimit, syncLimit);
+        assertEquals(2184, s4.getClientPort());
+        LOG.info("creating QuorumPeer 5");
+        s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 5, tickTime, 
initLimit, syncLimit);
+        assertEquals(2185, s5.getClientPort());
+    }
+
+    public void shutdownServers() throws Exception {
         shutdown(s1);
         shutdown(s2);
         shutdown(s3);
         shutdown(s4);
         shutdown(s5);
+    }
+
+
+    @After
+    @Override
+    protected void tearDown() throws Exception {
+        LOG.info("TearDown started");
+        shutdownServers();
 
         for (String hp : hostPort.split(",")) {
             assertTrue("waiting for server down",

Added: 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java?rev=882742&view=auto
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
 (added)
+++ 
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
 Fri Nov 20 22:27:29 2009
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test what happens when logs fall behind the snapshots or a follower has a
+ * higher epoch than a leader.
+ */
+public class QuorumZxidSyncTest extends TestCase {
+    QuorumBase qb = new QuorumBase();
+    
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        qb.setUp();
+    }
+    
+    @Test
+    /**
+     * find out what happens when a follower connects to leader that is behind
+     */
+    public void testBehindLeader() throws Exception {
+        // crank up the epoch numbers
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.tearDown();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        deleteFiles(qb.s1dir);
+        deleteFiles(qb.s2dir);
+        deleteFiles(qb.s3dir);
+        deleteFiles(qb.s4dir);
+        qb.setUpServers();
+        qb.s1.start();
+        qb.s2.start();
+        qb.s3.start();
+        qb.s4.start();
+        assertTrue("Servers didn't come up", 
ClientBase.waitForServerUp(qb.hostPort, 10000));
+        qb.s5.start();
+        String hostPort = "127.0.0.1:" + qb.s5.getClientPort();
+        assertFalse("Servers came up, but shouldn't have since it's ahead of 
leader",
+                ClientBase.waitForServerUp(hostPort, 10000));
+    }
+    
+
+    private void deleteFiles(File f) {
+        File v = new File(f, "version-2");
+        for(File c: v.listFiles()) {
+            c.delete();
+        }
+    }
+    
+    @Test
+    /**
+     * find out what happens when the latest state is in the snapshots not
+     * the logs.
+     */
+    public void testLateLogs() throws Exception {
+        // crank up the epoch numbers
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.close();
+        qb.tearDown();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.tearDown();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.close();
+        qb.tearDown();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.tearDown();
+        deleteLogs(qb.s1dir);
+        deleteLogs(qb.s2dir);
+        deleteLogs(qb.s3dir);
+        deleteLogs(qb.s4dir);
+        deleteLogs(qb.s5dir);
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.close();
+        qb.tearDown();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        boolean saw2 = false;
+        for(String child: zk.getChildren("/", false)) {
+            if (child.equals("2")) {
+                saw2 = true;
+            }
+        }
+        zk.close();
+        assertTrue("Didn't see /2 (went back in time)", saw2);
+    }
+    
+    private void deleteLogs(File f) {
+        File v = new File(f, "version-2");
+        for(File c: v.listFiles()) {
+            if (c.getName().startsWith("log")) {
+                c.delete();
+            }
+        }
+    }
+    
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        qb.tearDown();
+    }
+}


Reply via email to