Author: breed
Date: Wed Sep 24 14:27:50 2008
New Revision: 698743

URL: http://svn.apache.org/viewvc?rev=698743&view=rev
Log:
ZOOKEEPER-117 threading issues in Leader election

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=698743&r1=698742&r2=698743&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Sep 24 14:27:50 2008
@@ -66,3 +66,6 @@
  again. (breed via mahadev)
 
  ZOOKEEPER-137. client watcher objects can lose events (Patrick Hunt via breed)
+
+ ZOOKEEPER-117. threading issues in Leader election (Flavio Junqueira and 
Patrick
+ Hunt via breed)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=698743&r1=698742&r2=698743&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
 Wed Sep 24 14:27:50 2008
@@ -24,6 +24,7 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.net.SocketException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -59,6 +60,9 @@
 
     QuorumPeer self;
 
+    // the follower acceptor thread
+    FollowerCnxAcceptor cnxAcceptor;
+    
     // list of all the followers
     public HashSet<FollowerHandler> followers = new HashSet<FollowerHandler>();
 
@@ -194,6 +198,42 @@
     ConcurrentLinkedQueue<Proposal> toBeApplied = new 
ConcurrentLinkedQueue<Proposal>();
 
     Proposal newLeaderProposal = new Proposal();
+    
+    class FollowerCnxAcceptor extends Thread{
+        private volatile boolean stop = false;
+        
+        @Override
+        public void run() {
+            try {
+                while (!stop) {
+                    try{
+                        Socket s = ss.accept();
+                        s.setSoTimeout(self.tickTime * self.syncLimit);
+                        s.setTcpNoDelay(true);
+                        new FollowerHandler(s, Leader.this);
+                    } catch (SocketException e) {
+                        if (stop) {
+                            LOG.info("exception while shutting down acceptor: "
+                                    + e);
+
+                            // When Leader.shutdown() calls ss.close(),
+                            // the call to accept throws an exception.
+                            // We catch and set stop to true.
+                            stop = true;
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                LOG.warn("Exception while accepting follower", e);
+            }
+        }
+        
+        public void halt() {
+            stop = true;
+        }
+    }
 
     /**
      * This method is main function that is called to lead
@@ -217,21 +257,12 @@
                     + newLeaderProposal.packet.getZxid());
         }
         outstandingProposals.add(newLeaderProposal);
-        new Thread() {
-            @Override
-            public void run() {
-                try {
-                    while (true) {
-                        Socket s = ss.accept();
-                        s.setSoTimeout(self.tickTime * self.syncLimit);
-                        s.setTcpNoDelay(true);
-                        new FollowerHandler(s, Leader.this);
-                    }
-                } catch (Exception e) {
-                    LOG.warn("Exception while accepting follower", e);
-                }
-            }
-        }.start();
+        
+        // Start thread that waits for connection requests from 
+        // new followers.
+        cnxAcceptor = new FollowerCnxAcceptor();
+        cnxAcceptor.start();
+        
         // We have to get at least a majority of servers in sync with
         // us. We do this by waiting for the NEWLEADER packet to get
         // acknowledged
@@ -256,9 +287,12 @@
             self.cnxnFactory.setZooKeeperServer(zk);
         }
         // Everything is a go, simply start counting the ticks
-        synchronized (this) {
-            notifyAll();
-        }
+        // WARNING: I couldn't find any wait statement on a synchronized
+        // block that would be notified by this notifyAll() call, so
+        // I commented it out
+        //synchronized (this) {
+        //    notifyAll();
+        //}
         // We ping twice a tick, so we only update the tick every other
         // iteration
         boolean tickSkip = true;
@@ -299,9 +333,12 @@
         if (isShutdown) {
             return;
         }
+        
+        LOG.info("Shutdown called",
+                new Exception("shutdown Leader! reason: " + reason));
 
-        LOG.error("FIXMSG",new Exception("shutdown Leader! reason: "
-                        + reason));
+        cnxAcceptor.halt();
+        
         // NIO should not accept conenctions
         self.cnxnFactory.setZooKeeperServer(null);
         // clear all the connections


Reply via email to