Author: fpj
Date: Tue Oct 14 09:44:58 2008
New Revision: 704578

URL: http://svn.apache.org/viewvc?rev=704578&view=rev
Log:
ZOOKEEPER-185


Modified:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=704578&r1=704577&r2=704578&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java 
Tue Oct 14 09:44:58 2008
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Random;
 
 import org.apache.log4j.Logger;
@@ -41,18 +42,40 @@
 public class FLETest extends TestCase {
     protected static final Logger LOG = Logger.getLogger(FLETest.class);
 
+    class TestVote{
+       TestVote(int id, long leader){
+               this.leader = leader;
+               this.id = id;
+       }
+
+       long leader;
+       int id;
+    }
+ 
+    int countVotes(HashSet<TestVote> hs, long id){
+       int counter = 0;
+       for(TestVote v : hs){
+          if(v.leader == id) counter++;
+        }
+
+       return counter;
+    }
+
     int count;
     int baseport;
     int baseLEport;
     HashMap<Long,QuorumServer> peers; 
     ArrayList<LEThread> threads;
+    HashMap<Integer, HashSet<TestVote> > voteMap;
     File tmpdir[];
     int port[];
+    int successCount;
+    Object finalObj;
     
     volatile Vote votes[];
     volatile boolean leaderDies;
     volatile long leader = -1;
-    volatile int round = 1; 
+    //volatile int round = 1; 
     Random rand = new Random();
     
     @Override
@@ -63,9 +86,12 @@
         
         peers = new HashMap<Long,QuorumServer>(count);
         threads = new ArrayList<LEThread>(count);
+        voteMap = new HashMap<Integer, HashSet<TestVote> >();
         votes = new Vote[count];
         tmpdir = new File[count];
         port = new int[count];
+        successCount = 0;
+        finalObj = new Object();
         
         QuorumStats.registerAsConcrete();
         LOG.info("SetUp " + getName());
@@ -83,7 +109,7 @@
         FastLeaderElection le;
         int i;
         QuorumPeer peer;
-    int peerRound = 1;
+        //int peerRound = 1;
 
         LEThread(QuorumPeer peer, int i) {
             this.i = i;
@@ -94,47 +120,140 @@
             try {
                 Vote v = null;
                 while(true) {
-            peer.setPeerState(ServerState.LOOKING);
-            LOG.info("Going to call leader election again.");
+                    peer.setPeerState(ServerState.LOOKING);
+                    LOG.info("Going to call leader election again.");
                     v = peer.getElectionAlg().lookForLeader();
                     if(v == null){ 
                         LOG.info("Thread " + i + " got a null vote");
                         break;
                     }
-            peer.setCurrentVote(v);
+                    
+                    /*
+                     * A real zookeeper would take care of setting the current 
vote. Here
+                     * we do it manually.
+                     */
+                    peer.setCurrentVote(v);
             
                     LOG.info("Finished election: " + i + ", " + v.id);
                     votes[i] = v;
+                    
+                    /*
+                     * Get the current value of the logical clock for this 
peer.
+                     */
+                    int lc = (int) ((FastLeaderElection) 
peer.getElectionAlg()).getLogicalClock();
+                    
                     if (v.id == ((long) i)) {
-                        LOG.debug("I'm the leader");
+                        /*
+                         * A leader executes this part of the code. If it is 
the first leader to be 
+                         * elected, then it fails right after. Otherwise, it 
waits until it has enough
+                         * followers supporting it.
+                         */
+                        LOG.info("I'm the leader: " + i);
                         synchronized(FLETest.this) {
                             if (leaderDies) {
-                                LOG.debug("Leader " + i + " dying");
+                                LOG.info("Leader " + i + " dying");
                                 leaderDies = false;
                                 ((FastLeaderElection) 
peer.getElectionAlg()).shutdown();
                                 leader = -1;
-                                LOG.debug("Leader " + i + " dead");
+                                LOG.info("Leader " + i + " dead");
+                                
+                                //round++; 
+                                FLETest.this.notifyAll();
+                                
+                                break;
+                                
                             } else {
-                                leader = i; 
+                                synchronized(voteMap){
+                                    if(voteMap.get(lc) == null)
+                                        voteMap.put(lc, new 
HashSet<TestVote>());
+                                    HashSet<TestVote> hs = voteMap.get(lc);
+                                    hs.add(new TestVote(i, v.id));
+                                    
+                                    if(countVotes(hs, v.id) > (count/2)){
+                                        leader = i;
+                                        LOG.info("Got majority: " + i);   
+                                    } else {
+                                        voteMap.wait(3000);
+                                        LOG.info("Notified or expired: " + i);
+                                        hs = voteMap.get(lc);
+                                        if(countVotes(hs, v.id) > (count/2)){
+                                            leader = i;
+                                            LOG.info("Got majority: " + i);
+                                        } else {
+                                            //round++; 
+                                        }
+                                    }
+                                }
+                                FLETest.this.notifyAll();
+
+                                if(leader == i){
+                                    synchronized(finalObj){
+                                        successCount++;
+                                        if(successCount > (count/2)) 
finalObj.notify();
+                                    }
+                                    
+                                    break;
+                                }
                             }
-                round++; 
-                            FLETest.this.notifyAll();
                         }
-                        break;
-                    }
-                    synchronized(FLETest.this) {
-                        if (round == ((FastLeaderElection) 
peer.getElectionAlg()).getLogicalClock()) {
-                int tmp_round = round;
-                            FLETest.this.wait(1000);
-                if(tmp_round == round) round++;
+                    } else {
+                        /*
+                         * Followers execute this part. They first add their 
vote to voteMap, and then 
+                         * they wait for bounded amount of time. A leader 
notifies followers through the
+                         * FLETest.this object.
+                         * 
+                         * Note that I can get FLETest.this, and then voteMap 
before adding the vote of
+                         * a follower, otherwise a follower would be blocked 
out until the leader notifies
+                         * or leaves the synchronized block on FLEtest.this.
+                         */
+                        
+                        
+                        LOG.info("Logical clock " + ((FastLeaderElection) 
peer.getElectionAlg()).getLogicalClock());
+                        synchronized(voteMap){
+                            LOG.info("Voting on " + votes[i].id + ", round " + 
((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
+                            if(voteMap.get(lc) == null)
+                                voteMap.put(lc, new HashSet<TestVote>());
+                            HashSet<TestVote> hs = voteMap.get(lc);    
+                            hs.add(new TestVote(i, votes[i].id)); 
+                            if(countVotes(hs, votes[i].id) > (count/2)){
+                                LOG.info("Logical clock: " + lc + ", " + 
votes[i].id);
+                                voteMap.notify();
+                            }    
                         }
-            LOG.info("The leader: " + leader + " and my vote " + votes[i].id);
-                        if (leader == votes[i].id) {
-                            break;
+                        
+                        /*
+                         * In this part a follower waits until the leader 
notifies it, and remove its
+                         * vote if the leader takes too long to respond.
+                         */
+                        synchronized(FLETest.this){
+                            if (leader != votes[i].id) FLETest.this.wait(3000);
+                        
+                            LOG.info("The leader: " + leader + " and my vote " 
+ votes[i].id);
+                            synchronized(voteMap){ 
+                                if (leader == votes[i].id) {
+                                    synchronized(finalObj){
+                                        successCount++;
+                                        if(successCount > (count/2)) 
finalObj.notify();
+                                    }
+                                    break;
+                                } else {
+                                    HashSet<TestVote> hs = voteMap.get(lc);
+                                    TestVote toRemove = null;
+                                    for(TestVote tv : hs){
+                                        if(v.id == i){
+                                            toRemove = tv;
+                                            break;
+                                        }
+                                    }
+                                    hs.remove(toRemove);
+                                }
+                            }
                         }
-            peerRound++;
                     }
-                    Thread.sleep(rand.nextInt(1000));
+                    /*
+                     * Add some randomness to the execution.
+                     */
+                    Thread.sleep(rand.nextInt(500));
                     peer.setCurrentVote(new Vote(peer.getId(), 0));
                 }
                 LOG.debug("Thread " + i + " votes " + v);
@@ -162,32 +281,41 @@
         for(int i = 0; i < le.length; i++) {
             QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], 
port[i], 3, i, 2, 2, 2);
             peer.startLeaderElection();
-            //le[i] = new FastLeaderElection(peer, new QuorumCnxManager(peer));
             LEThread thread = new LEThread(peer, i);
             thread.start();
             threads.add(thread);
         }
         LOG.info("Started threads " + getName());
         
-       for(int i = 0; i < threads.size(); i++) {
-            threads.get(i).join(20000);
-            if (threads.get(i).isAlive()) {
-                fail("Threads didn't join: " + i);
+        
+        int waitCounter = 0;
+        synchronized(finalObj){
+            while((successCount <= count/2) && (waitCounter < 50)){
+                finalObj.wait(2000);
+                waitCounter++;
             }
         }
-        long id = votes[0].id;
-        for(int i = 1; i < votes.length; i++) {
-            if (votes[i] == null) {
-                fail("Thread " + i + " had a null vote");
-            }
-        LOG.info("Final leader info: " + i + ", " + votes[i].id + ", " + id); 
-            if (votes[i].id != id) {
-                if (allowOneBadLeader && votes[i].id == i) {
-                    allowOneBadLeader = false;
-                } else {
-                    fail("Thread " + i + " got " + votes[i].id + " expected " 
+ id);
-                }
+        
+       /*
+        * Lists what threads haven-t joined. A thread doesn't join if it 
hasn't decided
+        * upon a leader yet. It can happen that a peer is slow or 
disconnected, and it can
+        * take longer to nominate and connect to the current leader.
+        */
+       for(int i = 0; i < threads.size(); i++) {
+            if (threads.get(i).isAlive()) {
+                LOG.info("Threads didn't join: " + i);
             }
         }
+       
+       /*
+        * If we have a majority, then we are good to go.
+        */
+       if(successCount <= count/2){
+           fail("Fewer than a a majority has joined");
+       }
+       
+       if(threads.get((int) leader).isAlive()){
+           fail("Leader hasn't joined: " + leader);
+       }
     }
 }


Reply via email to