Author: mahadev
Date: Thu Sep 18 18:24:25 2008
New Revision: 696891

URL: http://svn.apache.org/viewvc?rev=696891&view=rev
Log:
ZOOKEEPER-131. Fix Old leader election can elect a dead leader over and over 
again. (breed via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=696891&r1=696890&r2=696891&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Sep 18 18:24:25 2008
@@ -61,3 +61,6 @@
 
  ZOOKEEPER-130. update build.xml to support apache release process. 
  (phunt via mahadev)
+
+ ZOOKEEPER-131. Fix Old leader election can elect a dead leader over and over
+ again. (breed via mahadev)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=696891&r1=696890&r2=696891&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
 Thu Sep 18 18:24:25 2008
@@ -26,6 +26,8 @@
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Random;
 import java.util.Map.Entry;
 
@@ -54,7 +56,7 @@
         public int winningCount;
     }
 
-    private ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes) {
+    private ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes, 
HashSet<Long> heardFrom) {
         ElectionResult result = new ElectionResult();
         // Initialize with null vote
         result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
@@ -62,7 +64,13 @@
         Collection<Vote> votesCast = votes.values();
         // First make the views consistent. Sometimes peers will have
         // different zxids for a server depending on timing.
-        for (Vote v : votesCast) {
+        for (Iterator<Vote> i = votesCast.iterator(); i.hasNext();) {
+            Vote v = i.next();
+            if (!heardFrom.contains(v.id)) {
+                // Discard votes for machines that we didn't hear from
+                i.remove();
+                continue;
+            }
             for (Vote w : votesCast) {
                 if (v.id == w.id) {
                     if (v.zxid < w.zxid) {
@@ -71,6 +79,7 @@
                 }
             }
         }
+        
         HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
         // Now do the tally
         for (Vote v : votesCast) {
@@ -127,6 +136,7 @@
             requestBuffer.clear();
             requestBuffer.putInt(xid);
             requestPacket.setLength(4);
+            HashSet<Long> heardFrom = new HashSet<Long>();
             for (QuorumServer server : self.quorumPeers) {
                 requestPacket.setSocketAddress(server.addr);
                 try {
@@ -145,7 +155,8 @@
                                 + " got " + recvedXid);
                         continue;
                     }
-                    responseBuffer.getLong();
+                    long peerId = responseBuffer.getLong();
+                    heardFrom.add(peerId);
                     //if(server.id != peerId){
                         Vote vote = new Vote(responseBuffer.getLong(),
                             responseBuffer.getLong());
@@ -154,12 +165,13 @@
                         votes.put(addr, vote);
                     //}
                 } catch (IOException e) {
+                    LOG.error("Error in looking for leader", e);
                     // Errors are okay, since hosts may be
                     // down
                     // ZooKeeperServer.logException(e);
                 }
             }
-            ElectionResult result = countVotes(votes);
+            ElectionResult result = countVotes(votes, heardFrom);
             if (result.winner.id >= 0) {
                 self.setCurrentVote(result.vote);
                 if (result.winningCount > (self.quorumPeers.size() / 2)) {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=696891&r1=696890&r2=696891&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
 Thu Sep 18 18:24:25 2008
@@ -160,13 +160,15 @@
             super("ResponderThread");
         }
 
+        volatile boolean running = true;
+        
         @Override
         public void run() {
             try {
                 byte b[] = new byte[36];
                 ByteBuffer responseBuffer = ByteBuffer.wrap(b);
                 DatagramPacket packet = new DatagramPacket(b, b.length);
-                while (true) {
+                while (running) {
                     udpSocket.receive(packet);
                     if (packet.getLength() != 4) {
                         LOG.warn("Got more than just an xid! Len = "
@@ -273,7 +275,17 @@
 
     @Override
     public synchronized void start() {
-        
+        startLeaderElection();
+        super.start();
+    }
+
+    ResponderThread responder;
+    
+    public void stopLeaderElection() {
+        responder.running = false;
+        responder.interrupt();
+    }
+    public void startLeaderElection() {
         currentVote = new Vote(myid, getLastLoggedZxid());
         for (QuorumServer p : quorumPeers) {
             if (p.id == myid) {
@@ -287,13 +299,14 @@
         if (electionType == 0) {
             try {
                 udpSocket = new DatagramSocket(myQuorumAddr.getPort());
-                new ResponderThread().start();
+                responder = new ResponderThread();
+                responder.start();
             } catch (SocketException e) {
                 throw new RuntimeException(e);
             }
         }
         this.electionAlg = createElectionAlgorithm(electionType);
-        super.start();
+       
     }
     
     /**

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java?rev=696891&r1=696890&r2=696891&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
 Thu Sep 18 18:24:25 2008
@@ -44,4 +44,7 @@
         return (int) (id & zxid);
     }
 
+    public String toString() {
+        return "(" + id + ", " + Long.toHexString(zxid) + ")";
+    }
 }
\ No newline at end of file

Added: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java?rev=696891&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java 
(added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java 
Thu Sep 18 18:24:25 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.LeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumStats;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+
+import junit.framework.TestCase;
+
+public class LETest extends TestCase {
+    volatile Vote votes[];
+    volatile boolean leaderDies;
+    volatile long leader = -1; 
+    Random rand = new Random();
+    class LEThread extends Thread {
+        LeaderElection le;
+        int i;
+        QuorumPeer peer;
+        LEThread(LeaderElection le, QuorumPeer peer, int i) {
+            this.le = le;
+            this.i = i;
+            this.peer = peer;
+        }
+        public void run() {
+            try {
+                Vote v = null;
+                while(true) {
+                    v = le.lookForLeader();
+                    votes[i] = v;
+                    if (v.id == i) {
+                        synchronized(LETest.this) {
+                            if (leaderDies) {
+                                leaderDies = false;
+                                peer.stopLeaderElection();
+                                System.out.println("Leader " + i + " dying");
+                                leader = -2;
+                            } else {
+                                leader = i; 
+                            }
+                            LETest.this.notifyAll();
+                        }
+                        break;
+                    }
+                    synchronized(LETest.this) {
+                        if (leader == -1) {
+                            LETest.this.wait();
+                        }
+                        if (leader == v.id) {
+                            break;
+                        }
+                    }
+                    Thread.sleep(rand.nextInt(1000));
+                    peer.setCurrentVote(new Vote(peer.getId(), 0));
+                }
+                System.out.println("Thread " + i + " votes " + v);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    public void testLE() throws Exception {
+        int count = 30;
+        int baseport= 33003;
+        ArrayList<QuorumServer> peers = new ArrayList<QuorumServer>(count);
+        ArrayList<LEThread> threads = new ArrayList<LEThread>(count);
+        File tmpdir[] = new File[count];
+        int port[] = new int[count];
+        votes = new Vote[count];
+        QuorumStats.registerAsConcrete();
+        for(int i = 0; i < count; i++) {
+            peers.add(new QuorumServer(i, new 
InetSocketAddress(baseport+100+i)));
+            tmpdir[i] = File.createTempFile("letest", "test");
+            port[i] = baseport+i;    
+        }
+        LeaderElection le[] = new LeaderElection[count];
+        leaderDies = true;
+        boolean allowOneBadLeader = leaderDies;
+        for(int i = 0; i < le.length; i++) {
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], 
port[i], 0, 0, i, 2, 2, 2);
+            peer.startLeaderElection();
+            le[i] = new LeaderElection(peer);
+            LEThread thread = new LEThread(le[i], peer, i);
+            thread.start();
+            threads.add(thread);
+        }
+        for(int i = 0; i < threads.size(); i++) {
+            threads.get(i).join(15000);
+            if (threads.get(i).isAlive()) {
+                fail("Threads didn't join");
+            }
+        }
+        long id = votes[0].id;
+        for(int i = 1; i < votes.length; i++) {
+            if (votes[i] == null) {
+                fail("Thread " + i + " had a null vote");
+            }
+            if (votes[i].id != id) {
+                if (allowOneBadLeader && votes[i].id == i) {
+                    allowOneBadLeader = false;
+                } else {
+                    fail("Thread " + i + " got " + votes[i].id + " expected " 
+ id);
+                }
+            }
+        }
+    }
+}


Reply via email to