Author: mahadev
Date: Mon Aug 25 14:13:01 2008
New Revision: 688885

URL: http://svn.apache.org/viewvc?rev=688885&view=rev
Log:
ZOOKEEPER-2. Fix synchronization issues in QuorumPeer and FastLeader election. 
(Flavio Paiva Junqueira via mahadev)

Modified:
    hadoop/zookeeper/trunk/src/java/Changes.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

Modified: hadoop/zookeeper/trunk/src/java/Changes.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/Changes.txt?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/Changes.txt (original)
+++ hadoop/zookeeper/trunk/src/java/Changes.txt Mon Aug 25 14:13:01 2008
@@ -18,3 +18,6 @@
   
  ZOOKEEPER-123. Fix  the wrong class is specified for the logger. (Jakob Homan
  via mahadev)
+
+ ZOOKEEPER-2. Fix synchronization issues in QuorumPeer and FastLeader
+ election. (Flavio Paiva Junqueira via mahadev)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
 Mon Aug 25 14:13:01 2008
@@ -264,11 +264,13 @@
                         break;
                     }
 
+                    Vote current = self.getCurrentVote();
+
                     switch (type) {
                     case 0:
                         // Receive challenge request
                         ToSend c = new ToSend(ToSend.mType.challenge, tag,
-                                self.currentVote.id, self.currentVote.zxid,
+                                current.id, current.zxid,
                                 logicalclock, self.getPeerState(),
                                 (InetSocketAddress) responsePacket
                                         .getSocketAddress());
@@ -309,8 +311,8 @@
                                     recvqueue.offer(n);
 
                                     ToSend a = new ToSend(ToSend.mType.ack,
-                                            tag, self.currentVote.id,
-                                            self.currentVote.zxid,
+                                            tag, current.id,
+                                            current.zxid,
                                             logicalclock, self.getPeerState(),
                                             (InetSocketAddress) addr);
 
@@ -328,7 +330,7 @@
                             recvqueue.offer(n);
 
                             ToSend a = new ToSend(ToSend.mType.ack, tag,
-                                    self.currentVote.id, self.currentVote.zxid,
+                                    current.id, current.zxid,
                                     logicalclock, self.getPeerState(),
                                     (InetSocketAddress) responsePacket
                                             .getSocketAddress());
@@ -685,7 +687,7 @@
 
     QuorumPeer self;
     int port;
-    long logicalclock; /* Election instance */
+    volatile long logicalclock; /* Election instance */
     DatagramSocket mySocket;
     long proposedLeader;
     long proposedZxid;

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
 Mon Aug 25 14:13:01 2008
@@ -49,26 +49,36 @@
        /* Sequence numbers for messages */
     static int sequencer = 0;
 
-    /*
+    /**
      * Determine how much time a process has to wait
      * once it believes that it has reached the end of
      * leader election.
      */
     static int finalizeWait = 100;
 
-    /*
+    /**
         * Challenge counter to avoid replay attacks
         */
        
        static int challengeCounter = 0;
        
     
-       /*
-        * Connection manager
+       /**
+        * Connection manager. Fast leader election uses TCP for 
+        * communication between peers, and QuorumCnxManager manages
+        * such connections. 
         */
        
        QuorumCnxManager manager;
 
+       
+       /**
+        * Notifications are messages that let other peers know that
+        * a given peer has changed its vote, either because it has
+        * joined leader election or because it learned of another 
+        * peer with higher zxid or same zxid and higher server id
+        */
+       
     static public class Notification {
         /*
          * Proposed leader
@@ -96,8 +106,10 @@
         InetAddress addr;
     }
 
-    /*
-     * Messages to send, both Notifications and Acks
+    /**
+     * Messages that a peer wants to send to other peers.
+     * These messages can be both Notifications and Acks
+     * of reception of notification.
      */
     static public class ToSend {
        static enum mType {crequest, challenge, notification, ack};
@@ -145,12 +157,24 @@
     LinkedBlockingQueue<ToSend> sendqueue;
     LinkedBlockingQueue<Notification> recvqueue;
 
+    /**
+     * Multi-threaded implementation of message handler. Messenger
+     * implements two sub-classes: WorkReceiver and  WorkSender. The
+     * functionality of each is obvious from the name. Each of these
+     * spawns a new thread.
+     */
+    
     private class Messenger {
        
         long lastProposedLeader;
         long lastProposedZxid;
         long lastEpoch;
         
+        /**
+         * Receives messages from instance of QuorumCnxManager on
+         * method run(), and processes such messages.
+         */
+        
         class WorkerReceiver implements Runnable {
 
                QuorumCnxManager manager;
@@ -175,7 +199,7 @@
                                }
                                response.buffer.clear();
                
-
+                               // State of peer that sent this message
                                QuorumPeer.ServerState ackstate = 
QuorumPeer.ServerState.LOOKING;
                                switch (response.buffer.getInt()) {
                                case 0:
@@ -189,6 +213,7 @@
                                        break;
                                }
                        
+                               // Instantiate Notification and set its 
attributes
                                Notification n = new Notification();
                                n.leader = response.buffer.getLong();
                                n.zxid = response.buffer.getLong();
@@ -196,6 +221,12 @@
                                n.state = ackstate;
                                n.addr = response.addr;
 
+                               /*
+                                * Accept the values of this notification
+                                * if we are at right epoch and the new 
notification
+                                * contains a vote that succeeds our current 
vote
+                                * in our order of votes.
+                                */
                                if ((messenger.lastEpoch <= n.epoch)
                                                && ((n.zxid > 
messenger.lastProposedZxid) 
                                                || ((n.zxid == 
messenger.lastProposedZxid) 
@@ -205,10 +236,18 @@
                                        messenger.lastEpoch = n.epoch;
                                }
 
-                               //InetAddress addr = (InetAddress) 
responsePacket.getSocketAddress();
+                               /*
+                                * If this server is looking, then send 
proposed leader
+                                */
+
                                if(self.getPeerState() == 
QuorumPeer.ServerState.LOOKING){
                                        recvqueue.offer(n);
-                                       if(recvqueue.size() == 0) 
LOG.warn("Message: " + n.addr);
+                                       if(recvqueue.size() == 0) 
LOG.debug("Message: " + n.addr);
+                                       /*
+                                        * Send a notification back if the peer 
that sent this
+                                        * message is also looking and its 
logical clock is 
+                                        * lagging behind.
+                                        */
                                        if((ackstate == 
QuorumPeer.ServerState.LOOKING)
                                                        && (n.epoch < 
logicalclock)){
                                                ToSend notmsg = new 
ToSend(ToSend.mType.notification, 
@@ -219,16 +258,21 @@
                                                                response.addr);
                                                sendqueue.offer(notmsg);
                                        }
-                               } else {                                        
        
-                                       if((ackstate == 
QuorumPeer.ServerState.LOOKING) &&
-                                                       (self.getPeerState() != 
QuorumPeer.ServerState.LOOKING)){
-                                               ToSend notmsg = new 
ToSend(ToSend.mType.notification, 
-                                                       self.currentVote.id, 
-                                                       self.currentVote.zxid,
-                                                       logicalclock,
-                                                       self.getPeerState(),
-                                                       response.addr);
-                                               sendqueue.offer(notmsg);
+                               } else {
+                                   /*
+                                    * If this server is not looking, but the 
one that sent the ack
+                                    * is looking, then send back what it 
believes to be the leader.
+                                    */
+                                   Vote current = self.getCurrentVote();
+                                   if(ackstate == 
QuorumPeer.ServerState.LOOKING){
+
+                                       ToSend notmsg = new 
ToSend(ToSend.mType.notification, 
+                                               current.id, 
+                                               current.zxid,
+                                               logicalclock,
+                                               self.getPeerState(),
+                                               response.addr);
+                                       sendqueue.offer(notmsg);
                                        }
                                }
                                
@@ -240,6 +284,12 @@
             }
         }
 
+        
+        /**
+         * This worker simply dequeues a message to send and
+         * and queues it on the manager's queue. 
+         */
+        
         class WorkerSender implements Runnable {
                
             QuorumCnxManager manager;
@@ -260,6 +310,11 @@
                 }
             }
 
+            /**
+             * Called by run() once there is a new message to send.
+             * 
+             * @param m     message to send
+             */
             private void process(ToSend m) {
                 byte requestBytes[] = new byte[28];
                 ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);  
@@ -279,10 +334,18 @@
             }
         }
 
+        /**
+         * Test if both send and receive queues are empty.
+         */
         public boolean queueEmpty() {
             return (sendqueue.isEmpty() || recvqueue.isEmpty());
         }
 
+        /**
+         * Constructor of class Messenger.
+         * 
+         * @param manager   Connection manager
+         */
         Messenger(QuorumCnxManager manager) {
             lastProposedLeader = 0;
             lastProposedZxid = 0;
@@ -303,17 +366,36 @@
 
     QuorumPeer self;
     int port;
-    long logicalclock; /* Election instance */
+    volatile long logicalclock; /* Election instance */
     Messenger messenger;
     long proposedLeader;
     long proposedZxid;
 
-        
+    
+    /**
+     * Constructor of FastLeaderElection. It takes two parameters, one
+     * is the QuorumPeer object that instantiated this object, and the other
+     * is the connection manager. Such an object should be created only once 
+     * by each peer during an instance of the ZooKeeper service.
+     * 
+     * @param self  QuorumPeer that created this object
+     * @param manager   Connection manager
+     */
     public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
        this.manager = manager;
        starter(self, manager);
     }
     
+    /**
+     * This method is invoked by the constructor. Because it is a
+     * part of the starting procedure of the object that must be on
+     * any constructor of this class, it is probably best to keep as
+     * a separate method. As we have a single constructor currently, 
+     * it is not strictly necessary to have it separate.
+     * 
+     * @param self      QuorumPeer that created this object
+     * @param manager   Connection manager   
+     */
     private void starter(QuorumPeer self, QuorumCnxManager manager) {
         this.self = self;
         proposedLeader = -1;
@@ -328,6 +410,7 @@
         recvqueue.clear();
     }
 
+    
     public static class ElectionResult {
         public Vote vote;
 
@@ -338,6 +421,9 @@
         public int winningCount;
     }
 
+    /**
+     * Send notifications to all peers upon a change in our vote
+     */
     private void sendNotifications() {
         for (QuorumServer server : self.quorumPeers) {
             InetAddress saddr = server.addr.getAddress();
@@ -353,6 +439,13 @@
         }
     }
 
+    /**
+     * Check if a pair (server id, zxid) succeeds our
+     * current vote.
+     * 
+     * @param id    Server identifier
+     * @param zxid  Last zxid observed by the issuer of this vote
+     */
     private boolean totalOrderPredicate(long id, long zxid) {
         if ((zxid > proposedZxid)
                 || ((zxid == proposedZxid) && (id > proposedLeader)))
@@ -362,6 +455,14 @@
 
     }
 
+    /**
+     * Termination predicate. Given a set of votes, determines if
+     * have sufficient to declare the end of the election round.
+     * 
+     *  @param votes    Set of votes
+     *  @param l        Identifier of the vote received last
+     *  @param zxid     zxid of the the vote received last
+     */
     private boolean termPredicate(
             HashMap<InetAddress, Vote> votes, long l,
             long zxid) {
@@ -384,6 +485,11 @@
 
     }
 
+    /**
+     * Starts a new round of leader election. Whenever our QuorumPeer 
+     * changes its state to LOOKING, this method is invoked, and it 
+     * sends notifications to al other peers.
+     */
     public Vote lookForLeader() throws InterruptedException {
         HashMap<InetAddress, Vote> recvset = new HashMap<InetAddress, Vote>();
 
@@ -394,7 +500,7 @@
         proposedLeader = self.getId();
         proposedZxid = self.getLastLoggedZxid();
 
-        LOG.warn("Election tally: " + proposedZxid);
+        LOG.warn("New election: " + proposedZxid);
         sendNotifications();
 
         /*
@@ -449,7 +555,7 @@
 
                 } else if (termPredicate(recvset, proposedLeader, 
proposedZxid)) {
                     //Otherwise, wait for a fixed amount of time
-                    LOG.warn("Passed predicate");
+                    LOG.debug("Passed predicate");
                     Thread.sleep(finalizeWait);
 
                     // Verify if there is any change in the proposed leader

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
 Mon Aug 25 14:13:01 2008
@@ -114,15 +114,16 @@
     void followLeader() throws InterruptedException {
         InetSocketAddress addr = null;
         // Find the leader by id
+        Vote current = self.getCurrentVote();
         for (QuorumServer s : self.quorumPeers) {
-            if (s.id == self.currentVote.id) {
+            if (s.id == current.id) {
                 addr = s.addr;
                 break;
             }
         }
         if (addr == null) {
             LOG.warn("Couldn't find the leader with id = "
-                    + self.currentVote.id);
+                    + current.id);
         }
         LOG.info("Following " + addr);
         sock = new Socket();

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
 Mon Aug 25 14:13:01 2008
@@ -100,7 +100,7 @@
     }
 
     public Vote lookForLeader() throws InterruptedException {
-        self.currentVote = new Vote(self.getId(), self.getLastLoggedZxid());
+        self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid()));
         // We are going to look for a leader by casting a vote for ourself
         byte requestBytes[] = new byte[4];
         ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
@@ -161,16 +161,17 @@
             }
             ElectionResult result = countVotes(votes);
             if (result.winner.id >= 0) {
-                self.currentVote = result.vote;
+                self.setCurrentVote(result.vote);
                 if (result.winningCount > (self.quorumPeers.size() / 2)) {
-                    self.currentVote = result.winner;
+                    self.setCurrentVote(result.winner);
                     s.close();
-                    self.setPeerState((self.currentVote.id == self.getId()) 
+                    Vote current = self.getCurrentVote();
+                    self.setPeerState((current.id == self.getId()) 
                             ? ServerState.LEADING: ServerState.FOLLOWING);
                     if (self.getPeerState() == ServerState.FOLLOWING) {
                         Thread.sleep(100);
                     }
-                    return self.currentVote;
+                    return current;
                 }
             }
             Thread.sleep(1000);

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 Mon Aug 25 14:13:01 2008
@@ -493,6 +493,8 @@
             this.addr = channel.socket().getInetAddress();
             this.channel = channel;
             recvWorker = null;
+            
+            LOG.debug("Address of remote peer: " + this.addr);
         }
 
         void setRecv(RecvWorker recvWorker) {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
 Mon Aug 25 14:13:01 2008
@@ -111,7 +111,15 @@
     /**
      * This is who I think the leader currently is.
      */
-    volatile Vote currentVote;
+    volatile private Vote currentVote;
+        
+    public synchronized Vote getCurrentVote(){
+        return currentVote;
+    }
+       
+    public synchronized void setCurrentVote(Vote v){
+        currentVote = v;
+    }    
 
     volatile boolean running = true;
 
@@ -167,10 +175,11 @@
                         responseBuffer.clear();
                         responseBuffer.getInt(); // Skip the xid
                         responseBuffer.putLong(myid);
-                        switch (state) {
+                        Vote current = getCurrentVote();
+                        switch (getPeerState()) {
                         case LOOKING:
-                            responseBuffer.putLong(currentVote.id);
-                            responseBuffer.putLong(currentVote.zxid);
+                            responseBuffer.putLong(current.id);
+                            responseBuffer.putLong(current.zxid);
                             break;
                         case LEADING:
                             responseBuffer.putLong(myid);
@@ -182,7 +191,7 @@
                             }
                             break;
                         case FOLLOWING:
-                            responseBuffer.putLong(currentVote.id);
+                            responseBuffer.putLong(current.id);
                             try {
                                 responseBuffer.putLong(follower.getZxid());
                             } catch (NullPointerException npe) {
@@ -205,11 +214,11 @@
 
     private ServerState state = ServerState.LOOKING;
 
-    public void setPeerState(ServerState newState){
+    public synchronized void setPeerState(ServerState newState){
         state=newState;
     }
 
-    public ServerState getPeerState(){
+    public synchronized ServerState getPeerState(){
         return state;
     }
 
@@ -364,14 +373,14 @@
          * Main loop
          */
         while (running) {
-            switch (state) {
+            switch (getPeerState()) {
             case LOOKING:
                 try {
                     LOG.info("LOOKING");
-                    currentVote = makeLEStrategy().lookForLeader();
+                    setCurrentVote(makeLEStrategy().lookForLeader());
                 } catch (Exception e) {
                     LOG.warn("Unexpected exception",e);
-                    state = ServerState.LOOKING;
+                    setPeerState(ServerState.LOOKING);
                 }
                 break;
             case FOLLOWING:
@@ -384,7 +393,7 @@
                 } finally {
                     follower.shutdown();
                     setFollower(null);
-                    state = ServerState.LOOKING;
+                    setPeerState(ServerState.LOOKING);
                 }
                 break;
             case LEADING:
@@ -400,7 +409,7 @@
                         leader.shutdown("Forcing shutdown");
                         setLeader(null);
                     }
-                    state = ServerState.LOOKING;
+                    setPeerState(ServerState.LOOKING);
                 }
                 break;
             }
@@ -504,7 +513,7 @@
     }
 
     public String getServerState() {
-        switch (state) {
+        switch (getPeerState()) {
         case LOOKING:
             return QuorumStats.Provider.LOOKING_STATE;
         case LEADING:


Reply via email to