Author: mahadev
Date: Fri Dec 12 12:21:30 2008
New Revision: 726110

URL: http://svn.apache.org/viewvc?rev=726110&view=rev
Log:
ZOOKEEPER-230. Improvements to FLE. (Flavio via mahadev)

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Dec 12 12:21:30 2008
@@ -73,6 +73,7 @@
    ZOOKEEPER-247. fix formatting of C API in ACL section of programmer guide.
    (patrick hunt via mahadev)
 
+   ZOOKEEPER-230. Improvements to FLE. (Flavio via mahadev)
 
 Release 3.0.0 - 2008-10-21
 

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
 Fri Dec 12 12:21:30 2008
@@ -46,22 +46,21 @@
 public class FastLeaderElection implements Election {
     private static final Logger LOG = 
Logger.getLogger(FastLeaderElection.class);
 
-       /* Sequence numbers for messages */
-    static int sequencer = 0;
-
     /**
      * Determine how much time a process has to wait
      * once it believes that it has reached the end of
      * leader election.
      */
-    static int finalizeWait = 200;
+    final static int finalizeWait = 200;
 
-    /**
-        * Challenge counter to avoid replay attacks
+
+       /**
+        * Upper bound on the amount of time between two consecutive
+        * notification checks. This impacts the amount of time to get
+        * the system up again after long partitions. Currently 60 seconds. 
         */
        
-       static int challengeCounter = 0;
-       
+    final static int maxNotificationInterval = 60000;
     
        /**
         * Connection manager. Fast leader election uses TCP for 
@@ -509,6 +508,8 @@
 
         HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
 
+        int notTimeout = finalizeWait;
+        
         synchronized(this){
             logicalclock++;
             updateProposal(self.getId(), self.getLastLoggedZxid());
@@ -526,7 +527,7 @@
              * Remove next notification from queue, times out after 2 times
              * the termination time
              */
-            Notification n = recvqueue.poll(2*finalizeWait, 
TimeUnit.MILLISECONDS);
+            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
             
             /*
              * Sends more notifications if haven't received enough.
@@ -535,89 +536,104 @@
             if(n == null){
                if(manager.haveDelivered()){
                        sendNotifications();
+               } else {
+                   manager.connectAll();
                }
+               
+               /*
+                * Exponential backoff
+                */
+               int tmpTimeOut = notTimeout*2;
+               notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut 
: maxNotificationInterval);
+               LOG.info("Notification time out: " + notTimeout);
             }
-            else switch (n.state) {
-            case LOOKING:
-                // If notification > current, replace and send messages out
-                LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + 
-                        n.epoch + ", " + self.getId() + ", " + 
self.getPeerState() + 
-                        ", " + n.state + ", " + n.sid);
-                if (n.epoch > logicalclock) {
-                    logicalclock = n.epoch;
-                    recvset.clear();
-                    updateProposal(self.getId(), self.getLastLoggedZxid());
-                    sendNotifications();
-                } else if (n.epoch < logicalclock) {
-                    break;
-                } else if (totalOrderPredicate(n.leader, n.zxid)) {
-                    updateProposal(n.leader, n.zxid);
-                    sendNotifications();
-                }
+            else { 
+                //notTimeout = finalizeWait;
+                switch (n.state) { 
+                case LOOKING:
+                    // If notification > current, replace and send messages out
+                    LOG.info("Notification: " + n.leader + ", " + n.zxid + ", 
" + 
+                            n.epoch + ", " + self.getId() + ", " + 
self.getPeerState() + 
+                            ", " + n.state + ", " + n.sid);
+                    if (n.epoch > logicalclock) {
+                        logicalclock = n.epoch;
+                        recvset.clear();
+                        updateProposal(self.getId(), self.getLastLoggedZxid());
+                        sendNotifications();
+                    } else if (n.epoch < logicalclock) {
+                        break;
+                    } else if (totalOrderPredicate(n.leader, n.zxid)) {
+                        updateProposal(n.leader, n.zxid);
+                        sendNotifications();
+                    }
                 
-                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
 
-                //If have received from all nodes, then terminate
-                if (self.quorumPeers.size() == recvset.size()) {
-                    self.setPeerState((proposedLeader == self.getId()) ? 
-                            ServerState.LEADING: ServerState.FOLLOWING);
-                    leaveInstance();
-                    return new Vote(proposedLeader, proposedZxid);
-
-                } else if (termPredicate(recvset, new Vote(proposedLeader, 
proposedZxid, logicalclock))) {
-                    //Otherwise, wait for a fixed amount of time
-                    LOG.debug("Passed predicate");
-
-                    // Verify if there is any change in the proposed leader
-                    while((n = recvqueue.poll(finalizeWait, 
TimeUnit.MILLISECONDS)) != null){
-                        if(totalOrderPredicate(n.leader, n.zxid)){
-                            recvqueue.put(n);
-                            break;
-                        }
-                    }
-                    
-                    if (n == null) {
+                    //If have received from all nodes, then terminate
+                    if (self.quorumPeers.size() == recvset.size()) {
                         self.setPeerState((proposedLeader == self.getId()) ? 
                                 ServerState.LEADING: ServerState.FOLLOWING);
-                        LOG.info("About to leave instance:" + proposedLeader + 
", " + proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
                         leaveInstance();
-                        return new Vote(proposedLeader,
+                        return new Vote(proposedLeader, proposedZxid);
+                        
+                    } else if (termPredicate(recvset, new Vote(proposedLeader, 
proposedZxid, logicalclock))) {
+                        //Otherwise, wait for a fixed amount of time
+                        LOG.debug("Passed predicate");
+
+                        // Verify if there is any change in the proposed leader
+                        while((n = recvqueue.poll(finalizeWait, 
TimeUnit.MILLISECONDS)) != null){
+                            if(totalOrderPredicate(n.leader, n.zxid)){
+                                recvqueue.put(n);
+                                break;
+                            }
+                        }
+                    
+                        if (n == null) {
+                            self.setPeerState((proposedLeader == self.getId()) 
? 
+                                ServerState.LEADING: ServerState.FOLLOWING);
+                            LOG.info("About to leave instance:" + 
proposedLeader + ", " + 
+                                    proposedZxid + ", " + self.getId() + ", " 
+ self.getPeerState());
+                            leaveInstance();
+                            return new Vote(proposedLeader,
                                 proposedZxid);
+                        }
                     }
-                }
-                break;
-            case LEADING:
-                /*
-                 * There is at most one leader for each epoch, so if a peer 
claims to
-                 * be the leader for an epoch, then that peer must be the 
leader (no
-                 * arbitrary failures assumed). Now, if there is no quorum 
supporting 
-                 * this leader, then processes will naturally move to a new 
epoch.
-                 */
-                if(n.epoch == logicalclock){
-                    self.setPeerState((n.leader == self.getId()) ? 
-                            ServerState.LEADING: ServerState.FOLLOWING);
+                    break;
+                case LEADING:
+                    /*
+                     * There is at most one leader for each epoch, so if a 
peer claims to
+                     * be the leader for an epoch, then that peer must be the 
leader (no
+                     * arbitrary failures assumed). Now, if there is no quorum 
supporting 
+                     * this leader, then processes will naturally move to a 
new epoch.
+                     */
+                    if(n.epoch == logicalclock){
+                        self.setPeerState((n.leader == self.getId()) ? 
+                                ServerState.LEADING: ServerState.FOLLOWING);
                    
-                    leaveInstance();
-                    return new Vote(n.leader, n.zxid);
-                }
-            case FOLLOWING:
-                LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + 
n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + 
", " + n.sid);
+                        leaveInstance();
+                        return new Vote(n.leader, n.zxid);
+                    }
+                case FOLLOWING:
+                    LOG.info("Notification: " + n.leader + ", " + n.zxid + 
+                            ", " + n.epoch + ", " + self.getId() + ", " + 
+                            self.getPeerState() + ", " + n.state + ", " + 
n.sid);
        
-                outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, 
n.state));
+                    outofelection.put(n.sid, new Vote(n.leader, n.zxid, 
n.epoch, n.state));
 
-                if (termPredicate(outofelection, new Vote(n.leader, n.zxid, 
n.epoch, n.state))
-                        && checkLeader(outofelection, n.leader, n.epoch)) {
-                    synchronized(this){
-                        logicalclock = n.epoch;
-                        self.setPeerState((n.leader == self.getId()) ? 
-                            ServerState.LEADING: ServerState.FOLLOWING);
+                    if (termPredicate(outofelection, new Vote(n.leader, 
n.zxid, n.epoch, n.state))
+                            && checkLeader(outofelection, n.leader, n.epoch)) {
+                        synchronized(this){
+                            logicalclock = n.epoch;
+                            self.setPeerState((n.leader == self.getId()) ? 
+                                    ServerState.LEADING: 
ServerState.FOLLOWING);
+                        }
+                        leaveInstance();
+                        return new Vote(n.leader, n.zxid);
                     }
-                    leaveInstance();
-                    return new Vote(n.leader, n.zxid);
+                    break;
+                default:
+                    break;
                 }
-                break;
-            default:
-                break;
             }
         }
 

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 Fri Dec 12 12:21:30 2008
@@ -25,10 +25,10 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.HashMap;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.Enumeration;
 
 import org.apache.log4j.Logger;
 
@@ -120,7 +120,6 @@
     }
 
     public QuorumCnxManager(QuorumPeer self) {
-        this.port = port;
         this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
         this.queueSendMap = new ConcurrentHashMap<Long, 
ArrayBlockingQueue<ByteBuffer>>();
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
@@ -153,11 +152,7 @@
      */
 
     boolean initiateConnection(SocketChannel s, Long sid) {
-        boolean challenged = true;
-        boolean wins = false;
-        long newChallenge;
-        
-        try {
+         try {
             // Sending id and challenge
             byte[] msgBytes = new byte[8];
             ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
@@ -173,11 +168,11 @@
         // If lost the challenge, then drop the new connection
         if (sid > self.getId()) {
             try {
-                LOG.warn("Have smaller server identifier, so dropping the 
connection: (" + 
-                        sid + ", " + self.getId());
+                LOG.info("Have smaller server identifier, so dropping the 
connection: (" + 
+                        sid + ", " + self.getId() + ")");
                 s.socket().close();
             } catch (IOException e) {
-                LOG.warn("Error when closing socket or trying to reopen 
connection: "
+                LOG.error("Error when closing socket or trying to reopen 
connection: "
                                 + e.toString());
 
             }
@@ -220,9 +215,6 @@
      * 
      */
     boolean receiveConnection(SocketChannel s) {
-        boolean challenged = true;
-        boolean wins = false;
-        long newChallenge;
         Long sid = null;
         
         try {
@@ -236,7 +228,7 @@
             // Read server id
             sid = Long.valueOf(msgBuffer.getLong());
         } catch (IOException e) {
-            LOG.warn("Exception reading or writing challenge: "
+            LOG.info("Exception reading or writing challenge: "
                     + e.toString());
             return false;
         }
@@ -246,7 +238,7 @@
             try {
                 SendWorker sw = senderWorkerMap.get(sid);
 
-                LOG.warn("Create new connection");
+                LOG.info("Create new connection to server: " + sid);
                 //sw.connect();
                 s.socket().close();
                 if(sw != null) sw.finish();
@@ -256,7 +248,7 @@
                 }
                 
             } catch (IOException e) {
-                LOG.warn("Error when closing socket or trying to reopen 
connection: "
+                LOG.info("Error when closing socket or trying to reopen 
connection: "
                                 + e.toString());
             }
         //Otherwise start worker threads to receive data.
@@ -290,8 +282,8 @@
     }
 
     /**
-     * Processes invoke this message to send a message. Currently, only leader
-     * election uses it.
+     * Processes invoke this message to queue a message to send. Currently, 
+     * only leader election uses it.
      */
     void toSend(Long sid, ByteBuffer b) {
         /*
@@ -322,39 +314,64 @@
                         queueSendMap.get(sid).take();
                     }
                     queueSendMap.get(sid).put(b);
-                }
+                } 
+                
+                connectOne(sid);
                 
-                //synchronized (senderWorkerMap) {
-                    if ((senderWorkerMap.get(sid) == null)) {
-                        SocketChannel channel;
-                        try {
-                            channel = SocketChannel
-                                    
.open(self.quorumPeers.get(sid).electionAddr);
-                            channel.socket().setTcpNoDelay(true);
-                            initiateConnection(channel, sid);
-                        } catch (IOException e) {
-                            LOG.warn("Cannot open channel to "
-                                    + sid + "( " + e.toString()
-                                    + ")");
-                        }
-                    }
-                //}     
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while waiting to put message in queue."
                                 + e.toString());
             }
     }
+    
+    /**
+     * Try to establish a connection to server with id sid.
+     * 
+     *  @param sid  server id
+     */
+    
+    void connectOne(long sid){
+        if ((senderWorkerMap.get(sid) == null)) {
+            SocketChannel channel;
+            try {
+                channel = SocketChannel
+                        .open(self.quorumPeers.get(sid).electionAddr);
+                channel.socket().setTcpNoDelay(true);
+                initiateConnection(channel, sid);
+            } catch (IOException e) {
+                LOG.warn("Cannot open channel to "
+                        + sid + "( " + e.toString()
+                        + ")");
+            }
+        }
+    }
+    
+    
+    /**
+     * Try to establish a connection with each server if one
+     * doesn't exist.
+     */
+    
+    void connectAll(){
+        long sid;
+        for(Enumeration<Long> en = queueSendMap.keys();
+            en.hasMoreElements();){
+            sid = en.nextElement();
+            connectOne(sid);
+        }      
+    }
+    
 
     /**
      * Check if all queues are empty, indicating that all messages have been 
delivered.
      */
     boolean haveDelivered() {
         for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
-            if (queue.size() == 0)
-                return true;
+            if (queue.size() != 0)
+                return false;
         }
 
-        return false;
+        return true;
     }
 
     /**


Reply via email to