Author: mahadev Date: Fri Dec 12 12:21:30 2008 New Revision: 726110 URL: http://svn.apache.org/viewvc?rev=726110&view=rev Log: ZOOKEEPER-230. Improvements to FLE. (Flavio via mahadev)
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=726110&r1=726109&r2=726110&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Dec 12 12:21:30 2008 @@ -73,6 +73,7 @@ ZOOKEEPER-247. fix formatting of C API in ACL section of programmer guide. (patrick hunt via mahadev) + ZOOKEEPER-230. Improvements to FLE. (Flavio via mahadev) Release 3.0.0 - 2008-10-21 Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=726110&r1=726109&r2=726110&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Dec 12 12:21:30 2008 @@ -46,22 +46,21 @@ public class FastLeaderElection implements Election { private static final Logger LOG = Logger.getLogger(FastLeaderElection.class); - /* Sequence numbers for messages */ - static int sequencer = 0; - /** * Determine how much time a process has to wait * once it believes that it has reached the end of * leader election. */ - static int finalizeWait = 200; + final static int finalizeWait = 200; - /** - * Challenge counter to avoid replay attacks + + /** + * Upper bound on the amount of time between two consecutive + * notification checks. This impacts the amount of time to get + * the system up again after long partitions. Currently 60 seconds. */ - static int challengeCounter = 0; - + final static int maxNotificationInterval = 60000; /** * Connection manager. Fast leader election uses TCP for @@ -509,6 +508,8 @@ HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); + int notTimeout = finalizeWait; + synchronized(this){ logicalclock++; updateProposal(self.getId(), self.getLastLoggedZxid()); @@ -526,7 +527,7 @@ * Remove next notification from queue, times out after 2 times * the termination time */ - Notification n = recvqueue.poll(2*finalizeWait, TimeUnit.MILLISECONDS); + Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. @@ -535,89 +536,104 @@ if(n == null){ if(manager.haveDelivered()){ sendNotifications(); + } else { + manager.connectAll(); } + + /* + * Exponential backoff + */ + int tmpTimeOut = notTimeout*2; + notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); + LOG.info("Notification time out: " + notTimeout); } - else switch (n.state) { - case LOOKING: - // If notification > current, replace and send messages out - LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + - n.epoch + ", " + self.getId() + ", " + self.getPeerState() + - ", " + n.state + ", " + n.sid); - if (n.epoch > logicalclock) { - logicalclock = n.epoch; - recvset.clear(); - updateProposal(self.getId(), self.getLastLoggedZxid()); - sendNotifications(); - } else if (n.epoch < logicalclock) { - break; - } else if (totalOrderPredicate(n.leader, n.zxid)) { - updateProposal(n.leader, n.zxid); - sendNotifications(); - } + else { + //notTimeout = finalizeWait; + switch (n.state) { + case LOOKING: + // If notification > current, replace and send messages out + LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + + n.epoch + ", " + self.getId() + ", " + self.getPeerState() + + ", " + n.state + ", " + n.sid); + if (n.epoch > logicalclock) { + logicalclock = n.epoch; + recvset.clear(); + updateProposal(self.getId(), self.getLastLoggedZxid()); + sendNotifications(); + } else if (n.epoch < logicalclock) { + break; + } else if (totalOrderPredicate(n.leader, n.zxid)) { + updateProposal(n.leader, n.zxid); + sendNotifications(); + } - recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch)); + recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch)); - //If have received from all nodes, then terminate - if (self.quorumPeers.size() == recvset.size()) { - self.setPeerState((proposedLeader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); - leaveInstance(); - return new Vote(proposedLeader, proposedZxid); - - } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) { - //Otherwise, wait for a fixed amount of time - LOG.debug("Passed predicate"); - - // Verify if there is any change in the proposed leader - while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ - if(totalOrderPredicate(n.leader, n.zxid)){ - recvqueue.put(n); - break; - } - } - - if (n == null) { + //If have received from all nodes, then terminate + if (self.quorumPeers.size() == recvset.size()) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: ServerState.FOLLOWING); - LOG.info("About to leave instance:" + proposedLeader + ", " + proposedZxid + ", " + self.getId() + ", " + self.getPeerState()); leaveInstance(); - return new Vote(proposedLeader, + return new Vote(proposedLeader, proposedZxid); + + } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) { + //Otherwise, wait for a fixed amount of time + LOG.debug("Passed predicate"); + + // Verify if there is any change in the proposed leader + while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ + if(totalOrderPredicate(n.leader, n.zxid)){ + recvqueue.put(n); + break; + } + } + + if (n == null) { + self.setPeerState((proposedLeader == self.getId()) ? + ServerState.LEADING: ServerState.FOLLOWING); + LOG.info("About to leave instance:" + proposedLeader + ", " + + proposedZxid + ", " + self.getId() + ", " + self.getPeerState()); + leaveInstance(); + return new Vote(proposedLeader, proposedZxid); + } } - } - break; - case LEADING: - /* - * There is at most one leader for each epoch, so if a peer claims to - * be the leader for an epoch, then that peer must be the leader (no - * arbitrary failures assumed). Now, if there is no quorum supporting - * this leader, then processes will naturally move to a new epoch. - */ - if(n.epoch == logicalclock){ - self.setPeerState((n.leader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); + break; + case LEADING: + /* + * There is at most one leader for each epoch, so if a peer claims to + * be the leader for an epoch, then that peer must be the leader (no + * arbitrary failures assumed). Now, if there is no quorum supporting + * this leader, then processes will naturally move to a new epoch. + */ + if(n.epoch == logicalclock){ + self.setPeerState((n.leader == self.getId()) ? + ServerState.LEADING: ServerState.FOLLOWING); - leaveInstance(); - return new Vote(n.leader, n.zxid); - } - case FOLLOWING: - LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + ", " + n.sid); + leaveInstance(); + return new Vote(n.leader, n.zxid); + } + case FOLLOWING: + LOG.info("Notification: " + n.leader + ", " + n.zxid + + ", " + n.epoch + ", " + self.getId() + ", " + + self.getPeerState() + ", " + n.state + ", " + n.sid); - outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state)); + outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state)); - if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state)) - && checkLeader(outofelection, n.leader, n.epoch)) { - synchronized(this){ - logicalclock = n.epoch; - self.setPeerState((n.leader == self.getId()) ? - ServerState.LEADING: ServerState.FOLLOWING); + if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state)) + && checkLeader(outofelection, n.leader, n.epoch)) { + synchronized(this){ + logicalclock = n.epoch; + self.setPeerState((n.leader == self.getId()) ? + ServerState.LEADING: ServerState.FOLLOWING); + } + leaveInstance(); + return new Vote(n.leader, n.zxid); } - leaveInstance(); - return new Vote(n.leader, n.zxid); + break; + default: + break; } - break; - default: - break; } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=726110&r1=726109&r2=726110&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Fri Dec 12 12:21:30 2008 @@ -25,10 +25,10 @@ import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.HashMap; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.Enumeration; import org.apache.log4j.Logger; @@ -120,7 +120,6 @@ } public QuorumCnxManager(QuorumPeer self) { - this.port = port; this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY); this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>(); this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>(); @@ -153,11 +152,7 @@ */ boolean initiateConnection(SocketChannel s, Long sid) { - boolean challenged = true; - boolean wins = false; - long newChallenge; - - try { + try { // Sending id and challenge byte[] msgBytes = new byte[8]; ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes); @@ -173,11 +168,11 @@ // If lost the challenge, then drop the new connection if (sid > self.getId()) { try { - LOG.warn("Have smaller server identifier, so dropping the connection: (" + - sid + ", " + self.getId()); + LOG.info("Have smaller server identifier, so dropping the connection: (" + + sid + ", " + self.getId() + ")"); s.socket().close(); } catch (IOException e) { - LOG.warn("Error when closing socket or trying to reopen connection: " + LOG.error("Error when closing socket or trying to reopen connection: " + e.toString()); } @@ -220,9 +215,6 @@ * */ boolean receiveConnection(SocketChannel s) { - boolean challenged = true; - boolean wins = false; - long newChallenge; Long sid = null; try { @@ -236,7 +228,7 @@ // Read server id sid = Long.valueOf(msgBuffer.getLong()); } catch (IOException e) { - LOG.warn("Exception reading or writing challenge: " + LOG.info("Exception reading or writing challenge: " + e.toString()); return false; } @@ -246,7 +238,7 @@ try { SendWorker sw = senderWorkerMap.get(sid); - LOG.warn("Create new connection"); + LOG.info("Create new connection to server: " + sid); //sw.connect(); s.socket().close(); if(sw != null) sw.finish(); @@ -256,7 +248,7 @@ } } catch (IOException e) { - LOG.warn("Error when closing socket or trying to reopen connection: " + LOG.info("Error when closing socket or trying to reopen connection: " + e.toString()); } //Otherwise start worker threads to receive data. @@ -290,8 +282,8 @@ } /** - * Processes invoke this message to send a message. Currently, only leader - * election uses it. + * Processes invoke this message to queue a message to send. Currently, + * only leader election uses it. */ void toSend(Long sid, ByteBuffer b) { /* @@ -322,39 +314,64 @@ queueSendMap.get(sid).take(); } queueSendMap.get(sid).put(b); - } + } + + connectOne(sid); - //synchronized (senderWorkerMap) { - if ((senderWorkerMap.get(sid) == null)) { - SocketChannel channel; - try { - channel = SocketChannel - .open(self.quorumPeers.get(sid).electionAddr); - channel.socket().setTcpNoDelay(true); - initiateConnection(channel, sid); - } catch (IOException e) { - LOG.warn("Cannot open channel to " - + sid + "( " + e.toString() - + ")"); - } - } - //} } catch (InterruptedException e) { LOG.warn("Interrupted while waiting to put message in queue." + e.toString()); } } + + /** + * Try to establish a connection to server with id sid. + * + * @param sid server id + */ + + void connectOne(long sid){ + if ((senderWorkerMap.get(sid) == null)) { + SocketChannel channel; + try { + channel = SocketChannel + .open(self.quorumPeers.get(sid).electionAddr); + channel.socket().setTcpNoDelay(true); + initiateConnection(channel, sid); + } catch (IOException e) { + LOG.warn("Cannot open channel to " + + sid + "( " + e.toString() + + ")"); + } + } + } + + + /** + * Try to establish a connection with each server if one + * doesn't exist. + */ + + void connectAll(){ + long sid; + for(Enumeration<Long> en = queueSendMap.keys(); + en.hasMoreElements();){ + sid = en.nextElement(); + connectOne(sid); + } + } + /** * Check if all queues are empty, indicating that all messages have been delivered. */ boolean haveDelivered() { for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) { - if (queue.size() == 0) - return true; + if (queue.size() != 0) + return false; } - return false; + return true; } /**