Author: mahadev Date: Tue Oct 27 21:14:00 2009 New Revision: 830340 URL: http://svn.apache.org/viewvc?rev=830340&view=rev Log: ZOOKEEPER-512. FLE election fails to elect leader (flavio via mahadev)
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=830340&r1=830339&r2=830340&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Tue Oct 27 21:14:00 2009 @@ -92,6 +92,8 @@ ZOOKEEPER-554. zkpython can segfault when statting a deleted node (henry robinson via phunt) + + ZOOKEEPER-512. FLE election fails to elect leader (flavio via mahadev) IMPROVEMENTS: ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=830340&r1=830339&r2=830340&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Tue Oct 27 21:14:00 2009 @@ -28,6 +28,7 @@ import java.util.Enumeration; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -314,9 +315,15 @@ */ synchronized void connectOne(long sid){ - if (senderWorkerMap.get(sid) == null){ - InetSocketAddress electionAddr = - self.quorumPeers.get(sid).electionAddr; + if (senderWorkerMap.get(sid) == null){ + InetSocketAddress electionAddr; + if(self.quorumPeers.containsKey(sid)) + electionAddr = + self.quorumPeers.get(sid).electionAddr; + else{ + LOG.warn("Invalid server id: " + sid); + return; + } try { SocketChannel channel; LOG.debug("Opening channel to server " + sid); @@ -403,27 +410,38 @@ */ @Override public void run() { - try { - ss = ServerSocketChannel.open(); - int port = self.quorumPeers.get(self.getId()).electionAddr.getPort(); - LOG.info("My election bind port: " + port); - ss.socket().setReuseAddress(true); - ss.socket().bind(new InetSocketAddress(port)); - - while (!shutdown) { - SocketChannel client = ss.accept(); - Socket sock = client.socket(); - sock.setTcpNoDelay(true); + int numRetries = 0; + while((!shutdown) && (numRetries < 3)){ + try { + ss = ServerSocketChannel.open(); + int port = self.quorumPeers.get(self.getId()).electionAddr.getPort(); + LOG.info("My election bind port: " + port); + ss.socket().setReuseAddress(true); + ss.socket().bind(new InetSocketAddress(port)); + + while (!shutdown) { + SocketChannel client = ss.accept(); + Socket sock = client.socket(); + sock.setTcpNoDelay(true); - LOG.debug("Connection request " - + sock.getRemoteSocketAddress()); + LOG.debug("Connection request " + + sock.getRemoteSocketAddress()); - LOG.debug("Connection request: " + self.getId()); - receiveConnection(client); + LOG.debug("Connection request: " + self.getId()); + receiveConnection(client); + numRetries = 0; + } + } catch (IOException e) { + LOG.error("Exception while listening", e); + numRetries++; } - } catch (IOException e) { - LOG.error("Listener.run: " + e.getMessage()); } + LOG.info("Leaving listener"); + if(!shutdown) + LOG.fatal("As I'm leaving the listener thread, " + + "I won't be able to participate in leader " + + "election any longer: " + + self.quorumPeers.get(self.getId()).electionAddr); } /** @@ -530,7 +548,7 @@ try { ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if(bq != null) - b = bq.take(); + b = bq.poll(1000, TimeUnit.MILLISECONDS); else { LOG.error("No queue of incoming messages for server " + sid); this.finish(); @@ -541,11 +559,11 @@ continue; } - if(b != null) - lastMessageSent.put(sid, b); - try { - if(b != null) send(b); + if(b != null){ + lastMessageSent.put(sid, b); + send(b); + } } catch (Exception e) { LOG.warn("Exception when using channel: " + sid, e); this.finish(); @@ -617,11 +635,14 @@ } } - } catch (IOException e) { + } catch (Exception e) { LOG.warn("Connection broken: ", e); - } catch (InterruptedException e) { - LOG.warn("Interrupted while trying to add new " - + "message to the reception queue", e); + } finally { + try{ + channel.socket().close(); + } catch (IOException e) { + LOG.warn("Exception while trying to close channel"); + } } } }