Author: mahadev Date: Thu Sep 18 18:24:25 2008 New Revision: 696891 URL: http://svn.apache.org/viewvc?rev=696891&view=rev Log: ZOOKEEPER-131. Fix Old leader election can elect a dead leader over and over again. (breed via mahadev)
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=696891&r1=696890&r2=696891&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Thu Sep 18 18:24:25 2008 @@ -61,3 +61,6 @@ ZOOKEEPER-130. update build.xml to support apache release process. (phunt via mahadev) + + ZOOKEEPER-131. Fix Old leader election can elect a dead leader over and over + again. (breed via mahadev) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=696891&r1=696890&r2=696891&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java Thu Sep 18 18:24:25 2008 @@ -26,6 +26,8 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.Random; import java.util.Map.Entry; @@ -54,7 +56,7 @@ public int winningCount; } - private ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes) { + private ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes, HashSet<Long> heardFrom) { ElectionResult result = new ElectionResult(); // Initialize with null vote result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE); @@ -62,7 +64,13 @@ Collection<Vote> votesCast = votes.values(); // First make the views consistent. Sometimes peers will have // different zxids for a server depending on timing. - for (Vote v : votesCast) { + for (Iterator<Vote> i = votesCast.iterator(); i.hasNext();) { + Vote v = i.next(); + if (!heardFrom.contains(v.id)) { + // Discard votes for machines that we didn't hear from + i.remove(); + continue; + } for (Vote w : votesCast) { if (v.id == w.id) { if (v.zxid < w.zxid) { @@ -71,6 +79,7 @@ } } } + HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>(); // Now do the tally for (Vote v : votesCast) { @@ -127,6 +136,7 @@ requestBuffer.clear(); requestBuffer.putInt(xid); requestPacket.setLength(4); + HashSet<Long> heardFrom = new HashSet<Long>(); for (QuorumServer server : self.quorumPeers) { requestPacket.setSocketAddress(server.addr); try { @@ -145,7 +155,8 @@ + " got " + recvedXid); continue; } - responseBuffer.getLong(); + long peerId = responseBuffer.getLong(); + heardFrom.add(peerId); //if(server.id != peerId){ Vote vote = new Vote(responseBuffer.getLong(), responseBuffer.getLong()); @@ -154,12 +165,13 @@ votes.put(addr, vote); //} } catch (IOException e) { + LOG.error("Error in looking for leader", e); // Errors are okay, since hosts may be // down // ZooKeeperServer.logException(e); } } - ElectionResult result = countVotes(votes); + ElectionResult result = countVotes(votes, heardFrom); if (result.winner.id >= 0) { self.setCurrentVote(result.vote); if (result.winningCount > (self.quorumPeers.size() / 2)) { Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=696891&r1=696890&r2=696891&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Sep 18 18:24:25 2008 @@ -160,13 +160,15 @@ super("ResponderThread"); } + volatile boolean running = true; + @Override public void run() { try { byte b[] = new byte[36]; ByteBuffer responseBuffer = ByteBuffer.wrap(b); DatagramPacket packet = new DatagramPacket(b, b.length); - while (true) { + while (running) { udpSocket.receive(packet); if (packet.getLength() != 4) { LOG.warn("Got more than just an xid! Len = " @@ -273,7 +275,17 @@ @Override public synchronized void start() { - + startLeaderElection(); + super.start(); + } + + ResponderThread responder; + + public void stopLeaderElection() { + responder.running = false; + responder.interrupt(); + } + public void startLeaderElection() { currentVote = new Vote(myid, getLastLoggedZxid()); for (QuorumServer p : quorumPeers) { if (p.id == myid) { @@ -287,13 +299,14 @@ if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); - new ResponderThread().start(); + responder = new ResponderThread(); + responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } this.electionAlg = createElectionAlgorithm(electionType); - super.start(); + } /** Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java?rev=696891&r1=696890&r2=696891&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java Thu Sep 18 18:24:25 2008 @@ -44,4 +44,7 @@ return (int) (id & zxid); } + public String toString() { + return "(" + id + ", " + Long.toHexString(zxid) + ")"; + } } \ No newline at end of file Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java?rev=696891&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java (added) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java Thu Sep 18 18:24:25 2008 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Random; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.server.quorum.LeaderElection; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumStats; +import org.apache.zookeeper.server.quorum.Vote; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; + +import junit.framework.TestCase; + +public class LETest extends TestCase { + volatile Vote votes[]; + volatile boolean leaderDies; + volatile long leader = -1; + Random rand = new Random(); + class LEThread extends Thread { + LeaderElection le; + int i; + QuorumPeer peer; + LEThread(LeaderElection le, QuorumPeer peer, int i) { + this.le = le; + this.i = i; + this.peer = peer; + } + public void run() { + try { + Vote v = null; + while(true) { + v = le.lookForLeader(); + votes[i] = v; + if (v.id == i) { + synchronized(LETest.this) { + if (leaderDies) { + leaderDies = false; + peer.stopLeaderElection(); + System.out.println("Leader " + i + " dying"); + leader = -2; + } else { + leader = i; + } + LETest.this.notifyAll(); + } + break; + } + synchronized(LETest.this) { + if (leader == -1) { + LETest.this.wait(); + } + if (leader == v.id) { + break; + } + } + Thread.sleep(rand.nextInt(1000)); + peer.setCurrentVote(new Vote(peer.getId(), 0)); + } + System.out.println("Thread " + i + " votes " + v); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + public void testLE() throws Exception { + int count = 30; + int baseport= 33003; + ArrayList<QuorumServer> peers = new ArrayList<QuorumServer>(count); + ArrayList<LEThread> threads = new ArrayList<LEThread>(count); + File tmpdir[] = new File[count]; + int port[] = new int[count]; + votes = new Vote[count]; + QuorumStats.registerAsConcrete(); + for(int i = 0; i < count; i++) { + peers.add(new QuorumServer(i, new InetSocketAddress(baseport+100+i))); + tmpdir[i] = File.createTempFile("letest", "test"); + port[i] = baseport+i; + } + LeaderElection le[] = new LeaderElection[count]; + leaderDies = true; + boolean allowOneBadLeader = leaderDies; + for(int i = 0; i < le.length; i++) { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 0, 0, i, 2, 2, 2); + peer.startLeaderElection(); + le[i] = new LeaderElection(peer); + LEThread thread = new LEThread(le[i], peer, i); + thread.start(); + threads.add(thread); + } + for(int i = 0; i < threads.size(); i++) { + threads.get(i).join(15000); + if (threads.get(i).isAlive()) { + fail("Threads didn't join"); + } + } + long id = votes[0].id; + for(int i = 1; i < votes.length; i++) { + if (votes[i] == null) { + fail("Thread " + i + " had a null vote"); + } + if (votes[i].id != id) { + if (allowOneBadLeader && votes[i].id == i) { + allowOneBadLeader = false; + } else { + fail("Thread " + i + " got " + votes[i].id + " expected " + id); + } + } + } + } +}