Author: fpj Date: Tue Oct 14 09:44:58 2008 New Revision: 704578 URL: http://svn.apache.org/viewvc?rev=704578&view=rev Log: ZOOKEEPER-185
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=704578&r1=704577&r2=704578&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Tue Oct 14 09:44:58 2008 @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Random; import org.apache.log4j.Logger; @@ -41,18 +42,40 @@ public class FLETest extends TestCase { protected static final Logger LOG = Logger.getLogger(FLETest.class); + class TestVote{ + TestVote(int id, long leader){ + this.leader = leader; + this.id = id; + } + + long leader; + int id; + } + + int countVotes(HashSet<TestVote> hs, long id){ + int counter = 0; + for(TestVote v : hs){ + if(v.leader == id) counter++; + } + + return counter; + } + int count; int baseport; int baseLEport; HashMap<Long,QuorumServer> peers; ArrayList<LEThread> threads; + HashMap<Integer, HashSet<TestVote> > voteMap; File tmpdir[]; int port[]; + int successCount; + Object finalObj; volatile Vote votes[]; volatile boolean leaderDies; volatile long leader = -1; - volatile int round = 1; + //volatile int round = 1; Random rand = new Random(); @Override @@ -63,9 +86,12 @@ peers = new HashMap<Long,QuorumServer>(count); threads = new ArrayList<LEThread>(count); + voteMap = new HashMap<Integer, HashSet<TestVote> >(); votes = new Vote[count]; tmpdir = new File[count]; port = new int[count]; + successCount = 0; + finalObj = new Object(); QuorumStats.registerAsConcrete(); LOG.info("SetUp " + getName()); @@ -83,7 +109,7 @@ FastLeaderElection le; int i; QuorumPeer peer; - int peerRound = 1; + //int peerRound = 1; LEThread(QuorumPeer peer, int i) { this.i = i; @@ -94,47 +120,140 @@ try { Vote v = null; while(true) { - peer.setPeerState(ServerState.LOOKING); - LOG.info("Going to call leader election again."); + peer.setPeerState(ServerState.LOOKING); + LOG.info("Going to call leader election again."); v = peer.getElectionAlg().lookForLeader(); if(v == null){ LOG.info("Thread " + i + " got a null vote"); break; } - peer.setCurrentVote(v); + + /* + * A real zookeeper would take care of setting the current vote. Here + * we do it manually. + */ + peer.setCurrentVote(v); LOG.info("Finished election: " + i + ", " + v.id); votes[i] = v; + + /* + * Get the current value of the logical clock for this peer. + */ + int lc = (int) ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock(); + if (v.id == ((long) i)) { - LOG.debug("I'm the leader"); + /* + * A leader executes this part of the code. If it is the first leader to be + * elected, then it fails right after. Otherwise, it waits until it has enough + * followers supporting it. + */ + LOG.info("I'm the leader: " + i); synchronized(FLETest.this) { if (leaderDies) { - LOG.debug("Leader " + i + " dying"); + LOG.info("Leader " + i + " dying"); leaderDies = false; ((FastLeaderElection) peer.getElectionAlg()).shutdown(); leader = -1; - LOG.debug("Leader " + i + " dead"); + LOG.info("Leader " + i + " dead"); + + //round++; + FLETest.this.notifyAll(); + + break; + } else { - leader = i; + synchronized(voteMap){ + if(voteMap.get(lc) == null) + voteMap.put(lc, new HashSet<TestVote>()); + HashSet<TestVote> hs = voteMap.get(lc); + hs.add(new TestVote(i, v.id)); + + if(countVotes(hs, v.id) > (count/2)){ + leader = i; + LOG.info("Got majority: " + i); + } else { + voteMap.wait(3000); + LOG.info("Notified or expired: " + i); + hs = voteMap.get(lc); + if(countVotes(hs, v.id) > (count/2)){ + leader = i; + LOG.info("Got majority: " + i); + } else { + //round++; + } + } + } + FLETest.this.notifyAll(); + + if(leader == i){ + synchronized(finalObj){ + successCount++; + if(successCount > (count/2)) finalObj.notify(); + } + + break; + } } - round++; - FLETest.this.notifyAll(); } - break; - } - synchronized(FLETest.this) { - if (round == ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock()) { - int tmp_round = round; - FLETest.this.wait(1000); - if(tmp_round == round) round++; + } else { + /* + * Followers execute this part. They first add their vote to voteMap, and then + * they wait for bounded amount of time. A leader notifies followers through the + * FLETest.this object. + * + * Note that I can get FLETest.this, and then voteMap before adding the vote of + * a follower, otherwise a follower would be blocked out until the leader notifies + * or leaves the synchronized block on FLEtest.this. + */ + + + LOG.info("Logical clock " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock()); + synchronized(voteMap){ + LOG.info("Voting on " + votes[i].id + ", round " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock()); + if(voteMap.get(lc) == null) + voteMap.put(lc, new HashSet<TestVote>()); + HashSet<TestVote> hs = voteMap.get(lc); + hs.add(new TestVote(i, votes[i].id)); + if(countVotes(hs, votes[i].id) > (count/2)){ + LOG.info("Logical clock: " + lc + ", " + votes[i].id); + voteMap.notify(); + } } - LOG.info("The leader: " + leader + " and my vote " + votes[i].id); - if (leader == votes[i].id) { - break; + + /* + * In this part a follower waits until the leader notifies it, and remove its + * vote if the leader takes too long to respond. + */ + synchronized(FLETest.this){ + if (leader != votes[i].id) FLETest.this.wait(3000); + + LOG.info("The leader: " + leader + " and my vote " + votes[i].id); + synchronized(voteMap){ + if (leader == votes[i].id) { + synchronized(finalObj){ + successCount++; + if(successCount > (count/2)) finalObj.notify(); + } + break; + } else { + HashSet<TestVote> hs = voteMap.get(lc); + TestVote toRemove = null; + for(TestVote tv : hs){ + if(v.id == i){ + toRemove = tv; + break; + } + } + hs.remove(toRemove); + } + } } - peerRound++; } - Thread.sleep(rand.nextInt(1000)); + /* + * Add some randomness to the execution. + */ + Thread.sleep(rand.nextInt(500)); peer.setCurrentVote(new Vote(peer.getId(), 0)); } LOG.debug("Thread " + i + " votes " + v); @@ -162,32 +281,41 @@ for(int i = 0; i < le.length; i++) { QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2); peer.startLeaderElection(); - //le[i] = new FastLeaderElection(peer, new QuorumCnxManager(peer)); LEThread thread = new LEThread(peer, i); thread.start(); threads.add(thread); } LOG.info("Started threads " + getName()); - for(int i = 0; i < threads.size(); i++) { - threads.get(i).join(20000); - if (threads.get(i).isAlive()) { - fail("Threads didn't join: " + i); + + int waitCounter = 0; + synchronized(finalObj){ + while((successCount <= count/2) && (waitCounter < 50)){ + finalObj.wait(2000); + waitCounter++; } } - long id = votes[0].id; - for(int i = 1; i < votes.length; i++) { - if (votes[i] == null) { - fail("Thread " + i + " had a null vote"); - } - LOG.info("Final leader info: " + i + ", " + votes[i].id + ", " + id); - if (votes[i].id != id) { - if (allowOneBadLeader && votes[i].id == i) { - allowOneBadLeader = false; - } else { - fail("Thread " + i + " got " + votes[i].id + " expected " + id); - } + + /* + * Lists what threads haven-t joined. A thread doesn't join if it hasn't decided + * upon a leader yet. It can happen that a peer is slow or disconnected, and it can + * take longer to nominate and connect to the current leader. + */ + for(int i = 0; i < threads.size(); i++) { + if (threads.get(i).isAlive()) { + LOG.info("Threads didn't join: " + i); } } + + /* + * If we have a majority, then we are good to go. + */ + if(successCount <= count/2){ + fail("Fewer than a a majority has joined"); + } + + if(threads.get((int) leader).isAlive()){ + fail("Leader hasn't joined: " + leader); + } } }