Author: breed
Date: Fri Jul 23 06:31:57 2010
New Revision: 966984

URL: http://svn.apache.org/viewvc?rev=966984&view=rev
Log:
ZOOKEEPER-790. Last processed zxid set prematurely while establishing 
leadership (fpj via breed)


Modified:
    hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
    
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java
    
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java

Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Fri Jul 23 06:31:57 2010
@@ -58,6 +58,8 @@ BUGFIXES:
   ZOOKEEPER-766. forrest recipes docs don't mention the lock/queue recipe
   implementations available in the release (phunt via mahadev)
 
+  ZOOKEEPER-790. Last processed zxid set prematurely while establishing 
leadership (fpj via breed)
+
 Release 3.3.0 - 2010-03-24
 
 Non-backward compatible changes:

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
 Fri Jul 23 06:31:57 2010
@@ -193,7 +193,12 @@ public class FileTxnLog implements TxnLo
                         + hdr.getType());
             }
             if (logStream==null) {
-               logFileWrite = new File(logDir, ("log." +
+               if(LOG.isInfoEnabled()){
+                    LOG.info("Creating new log file: log." +  
+                            Long.toHexString(hdr.getZxid()));
+               }
+               
+               logFileWrite = new File(logDir, ("log." + 
                        Long.toHexString(hdr.getZxid())));
                fos = new FileOutputStream(logFileWrite);
                logStream=new BufferedOutputStream(fos);

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
 Fri Jul 23 06:31:57 2010
@@ -274,16 +274,15 @@ public class Leader {
         try {
             self.tick = 0;
             zk.loadData();
-            zk.startup();
+            
             long epoch = self.getLastLoggedZxid() >> 32L;
             epoch++;
             zk.setZxid(epoch << 32L);
-            zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
             
             synchronized(this){
                 lastProposed = zk.getZxid();
             }
-                        
+            
             newLeaderProposal.packet = new QuorumPacket(NEWLEADER, 
zk.getZxid(),
                     null, null);
 
@@ -327,6 +326,13 @@ public class Leader {
                 Thread.sleep(self.tickTime);
                 self.tick++;
             }
+            
+            if(LOG.isInfoEnabled()){
+                LOG.info("Have quorum of supporters; starting up and setting 
last processed zxid: " + zk.getZxid());
+            }
+            zk.startup();
+            zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
+            
             if (!System.getProperty("zookeeper.leaderServes", 
"yes").equals("no")) {
                 self.cnxnFactory.setZooKeeperServer(zk);
             }
@@ -466,7 +472,7 @@ public class Leader {
             LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
                     + " is " + p.ackSet.size());
         }
-        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){        
+        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){             
             if (zxid != lastCommitted+1) {
                 LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
                         + " from " + followerAddr + " not first!");
@@ -603,6 +609,16 @@ public class Leader {
 
     long lastProposed;
 
+    
+    /**
+     * Returns the current epoch of the leader.
+     * 
+     * @return
+     */
+    public long getEpoch(){
+        return lastProposed >> 32L;
+    }
+    
     /**
      * create a proposal and send it out to all the members
      * 

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
 Fri Jul 23 06:31:57 2010
@@ -311,6 +311,9 @@ public class Learner {       
                 System.exit(13);
 
             }
+            if(LOG.isInfoEnabled()){
+                LOG.info("Setting leader epoch " + 
Long.toHexString(newLeaderZxid >> 32L));
+            }
             zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid);
         }
         ack.setZxid(newLeaderZxid & ~0xffffffffL);

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumBase.java
 Fri Jul 23 06:31:57 2010
@@ -211,48 +211,72 @@ public class QuorumBase extends ClientBa
         }
         JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
     }
-    public void setupServers() throws IOException {
+   
+    public void setupServers() throws IOException {        
+        setupServer(1);
+        setupServer(2);
+        setupServer(3);
+        setupServer(4);
+        setupServer(5);
+    }
+    
+    HashMap<Long,QuorumServer> peers = null;
+    public void setupServer(int i) throws IOException {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
-        HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+        
+        if(peers == null){
+            peers = new HashMap<Long,QuorumServer>();
 
-        peers.put(Long.valueOf(1), new QuorumServer(1, 
+            peers.put(Long.valueOf(1), new QuorumServer(1, 
                 new InetSocketAddress("127.0.0.1", port1 + 1000),
                 new InetSocketAddress("127.0.0.1", portLE1 + 1000),
                 LearnerType.PARTICIPANT));
-        peers.put(Long.valueOf(2), new QuorumServer(2, 
+            peers.put(Long.valueOf(2), new QuorumServer(2, 
                 new InetSocketAddress("127.0.0.1", port2 + 1000),
                 new InetSocketAddress("127.0.0.1", portLE2 + 1000),
                 LearnerType.PARTICIPANT));
-        peers.put(Long.valueOf(3), new QuorumServer(3, 
+            peers.put(Long.valueOf(3), new QuorumServer(3, 
                 new InetSocketAddress("127.0.0.1", port3 + 1000),
                 new InetSocketAddress("127.0.0.1", portLE3 + 1000),
                 LearnerType.PARTICIPANT));
-        peers.put(Long.valueOf(4), new QuorumServer(4, 
+            peers.put(Long.valueOf(4), new QuorumServer(4, 
                 new InetSocketAddress("127.0.0.1", port4 + 1000),
                 new InetSocketAddress("127.0.0.1", portLE4 + 1000),
                 LearnerType.PARTICIPANT));
-        peers.put(Long.valueOf(5), new QuorumServer(5, 
+            peers.put(Long.valueOf(5), new QuorumServer(5, 
                 new InetSocketAddress("127.0.0.1", port5 + 1000),
                 new InetSocketAddress("127.0.0.1", portLE5 + 1000),
                 LearnerType.PARTICIPANT));
+        }
         
-        LOG.info("creating QuorumPeer 1 port " + port1);
-        s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, 
initLimit, syncLimit);
-        assertEquals(port1, s1.getClientPort());
-        LOG.info("creating QuorumPeer 2 port " + port2);
-        s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, 
initLimit, syncLimit);
-        assertEquals(port2, s2.getClientPort());
-        LOG.info("creating QuorumPeer 3 port " + port3);
-        s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, 
initLimit, syncLimit);
-        assertEquals(port3, s3.getClientPort());
-        LOG.info("creating QuorumPeer 4 port " + port4);
-        s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, 
initLimit, syncLimit);
-        assertEquals(port4, s4.getClientPort());
-        LOG.info("creating QuorumPeer 5 port " + port5);
-        s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, 
initLimit, syncLimit);
-        assertEquals(port5, s5.getClientPort());
+        switch(i){
+        case 1:
+            LOG.info("creating QuorumPeer 1 port " + port1);
+            s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, 
initLimit, syncLimit);
+            assertEquals(port1, s1.getClientPort());
+            break;
+        case 2:
+            LOG.info("creating QuorumPeer 2 port " + port2);
+            s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, 
initLimit, syncLimit);
+            assertEquals(port2, s2.getClientPort());
+            break;
+        case 3:  
+            LOG.info("creating QuorumPeer 3 port " + port3);
+            s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, 
initLimit, syncLimit);
+            assertEquals(port3, s3.getClientPort());
+            break;
+        case 4:
+            LOG.info("creating QuorumPeer 4 port " + port4);
+            s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, 
initLimit, syncLimit);
+            assertEquals(port4, s4.getClientPort());
+            break;
+        case 5:
+            LOG.info("creating QuorumPeer 5 port " + port5);
+            s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, 
initLimit, syncLimit);
+            assertEquals(port5, s5.getClientPort());
+        }
     }
 
     @After

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=966984&r1=966983&r2=966984&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumTest.java
 Fri Jul 23 06:31:57 2010
@@ -19,6 +19,9 @@
 package org.apache.zookeeper.test;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
+
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.AsyncCallback;
@@ -260,5 +263,89 @@ public class QuorumTest extends QuorumBa
         }
         zk.close();
     }
+    
+    /**
+     * Tests if closeSession can be logged before a leader gets established, 
which
+     * could lead to a locked-out follower (see ZOOKEEPER-790). 
+     * 
+     * The test works as follows. It has a client connecting to a follower f 
and
+     * sending batches of 1,000 updates. The goal is that f has a zxid higher 
than
+     * all other servers in the initial leader election. This way we can crash 
and
+     * recover the follower so that the follower believes it is the leader 
once it
+     * recovers (LE optimization: once a server receives a message from all 
other 
+     * servers, it picks a leader.
+     * 
+     * It also makes the session timeout very short so that we force the false 
+     * leader to close the session and write it to the log in the buggy code 
(before 
+     * ZOOKEEPER-790). Once f drops leadership and finds the current leader, 
its epoch
+     * is higher, and it rejects the leader. Now, if we prevent the leader 
from closing
+     * the session by only starting up (see Leader.lead()) once it obtains a 
quorum of 
+     * supporters, then f will find the current leader and support it because 
it won't
+     * have a highe epoch.
+     * 
+     */
+    @Test
+    public void testNoLogBeforeLeaderEstablishment () 
+    throws IOException, InterruptedException, KeeperException{
+        final Semaphore sem = new Semaphore(0);
+                
+        Leader leader = qb.s1.leader;
+        if (leader == null) leader = qb.s2.leader;
+        if (leader == null) leader = qb.s3.leader;
+        if (leader == null) leader = qb.s4.leader;
+        if (leader == null) leader = qb.s5.leader;
+        
+        assertNotNull(leader);
+        
+        int serverPort = qb.s1.getClientPort();
+        if(qb.s1.leader != null){
+            serverPort = qb.s2.getClientPort();
+        }
+        
+        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + serverPort, 
1000, new Watcher() {
+            public void process(WatchedEvent event) {
+        }});
+
+        zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);      
+        
+        for(int i = 0; i < 50000; i++) {
+            zk.setData("/blah", new byte[0], -1, new 
AsyncCallback.StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                    if(counter == 20000){
+                        sem.release();
+                    }
+                }
+            }, null);
+            
+            if(i == 5000){
+                qb.shutdown(qb.s1);
+                LOG.info("Shutting down s1");
+            }
+            if(i == 12000){
+                qb.setupServer(1);
+                qb.s1.start();
+                LOG.info("Setting up s1");
+            }
+            if((i % 1000) == 0){
+                Thread.sleep(500);
+            }
+        }
+
+        // Wait until all updates return
+        sem.tryAcquire(15000, TimeUnit.MILLISECONDS);
+        
+        // Verify that server is following and has the same epoch as the leader
+        assertTrue("Not following", qb.s1.follower != null);
+        long epochF = (qb.s1.getActiveServer().getZxid() >> 32L);
+        long epochL = (leader.getEpoch() >> 32L);
+        assertTrue("Zxid: " + qb.s1.getActiveServer().getZxid() + 
+                "Current epoch: " + epochF, epochF == epochL);
+        
+    }
        // skip superhammer and clientcleanup as they are too expensive for 
quorum
 }


Reply via email to