Author: mahadev
Date: Tue Oct 27 21:14:00 2009
New Revision: 830340

URL: http://svn.apache.org/viewvc?rev=830340&view=rev
Log:
ZOOKEEPER-512. FLE election fails to elect leader (flavio via mahadev)

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=830340&r1=830339&r2=830340&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Oct 27 21:14:00 2009
@@ -92,6 +92,8 @@
 
   ZOOKEEPER-554. zkpython can segfault when statting a deleted node
   (henry robinson via phunt)
+
+  ZOOKEEPER-512. FLE election fails to elect leader (flavio via mahadev)
  
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=830340&r1=830339&r2=830340&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 Tue Oct 27 21:14:00 2009
@@ -28,6 +28,7 @@
 import java.util.Enumeration;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
@@ -314,9 +315,15 @@
      */
     
     synchronized void connectOne(long sid){
-        if (senderWorkerMap.get(sid) == null){ 
-            InetSocketAddress electionAddr =
-                self.quorumPeers.get(sid).electionAddr;
+        if (senderWorkerMap.get(sid) == null){
+            InetSocketAddress electionAddr;
+            if(self.quorumPeers.containsKey(sid))
+                electionAddr =
+                    self.quorumPeers.get(sid).electionAddr;
+            else{
+                LOG.warn("Invalid server id: " + sid);
+                return;
+            }
             try {
                 SocketChannel channel;
                 LOG.debug("Opening channel to server "  + sid);
@@ -403,27 +410,38 @@
          */
         @Override
         public void run() {
-            try {
-                ss = ServerSocketChannel.open();
-                int port = 
self.quorumPeers.get(self.getId()).electionAddr.getPort();
-                LOG.info("My election bind port: " + port);
-                ss.socket().setReuseAddress(true); 
-                ss.socket().bind(new InetSocketAddress(port));
-
-                while (!shutdown) {
-                    SocketChannel client = ss.accept();
-                    Socket sock = client.socket();
-                    sock.setTcpNoDelay(true);
+            int numRetries = 0;
+            while((!shutdown) && (numRetries < 3)){
+                try {
+                    ss = ServerSocketChannel.open();
+                    int port = 
self.quorumPeers.get(self.getId()).electionAddr.getPort();
+                    LOG.info("My election bind port: " + port);
+                    ss.socket().setReuseAddress(true); 
+                    ss.socket().bind(new InetSocketAddress(port));
+
+                    while (!shutdown) {
+                        SocketChannel client = ss.accept();
+                        Socket sock = client.socket();
+                        sock.setTcpNoDelay(true);
                     
-                    LOG.debug("Connection request "
-                            + sock.getRemoteSocketAddress());
+                        LOG.debug("Connection request "
+                                + sock.getRemoteSocketAddress());
                     
-                    LOG.debug("Connection request: " + self.getId());
-                    receiveConnection(client);
+                        LOG.debug("Connection request: " + self.getId());
+                        receiveConnection(client);
+                        numRetries = 0;
+                    }
+                } catch (IOException e) {
+                    LOG.error("Exception while listening", e);
+                    numRetries++;
                 }
-            } catch (IOException e) {
-                LOG.error("Listener.run: " + e.getMessage());
             }
+            LOG.info("Leaving listener");
+            if(!shutdown)
+                LOG.fatal("As I'm leaving the listener thread, " +
+                               "I won't be able to participate in leader " +
+                               "election any longer: " + 
+                               
self.quorumPeers.get(self.getId()).electionAddr);
         }
         
         /**
@@ -530,7 +548,7 @@
                 try {
                     ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); 
                   
                     if(bq != null) 
-                        b = bq.take();
+                        b = bq.poll(1000, TimeUnit.MILLISECONDS);
                     else {
                         LOG.error("No queue of incoming messages for server " 
+ sid);
                         this.finish();
@@ -541,11 +559,11 @@
                     continue;
                 }
                 
-                if(b != null)
-                    lastMessageSent.put(sid, b);
-                
                 try {
-                    if(b != null) send(b);
+                    if(b != null){
+                        lastMessageSent.put(sid, b);
+                        send(b);
+                    }
                 } catch (Exception e) {
                     LOG.warn("Exception when using channel: " + sid, e);
                     this.finish();
@@ -617,11 +635,14 @@
                     }
                 }
 
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOG.warn("Connection broken: ", e);
-            } catch (InterruptedException e) {
-                LOG.warn("Interrupted while trying to add new "
-                        + "message to the reception queue", e);
+            } finally {
+                try{
+                    channel.socket().close();
+                } catch (IOException e) {
+                    LOG.warn("Exception while trying to close channel");
+                }
             }
         }
     }


Reply via email to