Author: fpj
Date: Wed Nov 17 14:59:29 2010
New Revision: 1036071

URL: http://svn.apache.org/viewvc?rev=1036071&view=rev
Log:
ZOOKEEPER-900. FLE implementation should be improved to use non-blocking 
sockets (vishal via fpj)


Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1036071&r1=1036070&r2=1036071&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Nov 17 14:59:29 2010
@@ -153,6 +153,8 @@ BUGFIXES: 
 
   ZOOKEEPER-930. Hedwig c++ client uses a non thread safe logging library 
(ivan via breed)
 
+  ZOOKEEPER-900. FLE implementation should be improved to use non-blocking 
sockets (vishal via fpj)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=1036071&r1=1036070&r2=1036071&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
 Wed Nov 17 14:59:29 2010
@@ -41,7 +41,7 @@ import org.apache.zookeeper.server.quoru
 
 public class LeaderElection implements Election  {
     private static final Logger LOG = Logger.getLogger(LeaderElection.class);
-    protected static Random epochGen = new Random();
+    protected static final Random epochGen = new Random();
 
     protected QuorumPeer self;
 

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1036071&r1=1036070&r2=1036071&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 Wed Nov 17 14:59:29 2010
@@ -18,15 +18,17 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketException;
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.Enumeration;
-import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -141,49 +143,41 @@ public class QuorumCnxManager {
      * @param sid
      */
     public void testInitiateConnection(long sid) throws Exception {
-        SocketChannel channel;
-        if(LOG.isDebugEnabled()){
-            LOG.debug("Opening channel to server "  + sid);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Opening channel to server " + sid);
         }
-        
-        channel = SocketChannel.open();
-        channel.socket().connect(self.getVotingView().get(sid).electionAddr, 
cnxTO);
-        channel.socket().setTcpNoDelay(true);
-        initiateConnection(channel, sid);
+        Socket sock = new Socket();
+        setSockOpts(sock);
+        sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
+        initiateConnection(sock, sid);
     }
     
     /**
      * If this server has initiated the connection, then it gives up on the
      * connection if it loses challenge. Otherwise, it keeps the connection.
      */
-
-    public boolean initiateConnection(SocketChannel s, Long sid) {
-         try {
+    public boolean initiateConnection(Socket sock, Long sid) {
+        DataOutputStream dout = null;
+        try {
             // Sending id and challenge
-            byte[] msgBytes = new byte[8];
-            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-            msgBuffer.putLong(self.getId());
-            msgBuffer.position(0);
-            s.write(msgBuffer);
+            dout = new DataOutputStream(sock.getOutputStream());
+            dout.writeLong(self.getId());
+            dout.flush();
         } catch (IOException e) {
-            LOG.warn("Exception reading or writing challenge: ", e);
+            LOG.warn("Ignoring exception reading or writing challenge: ", e);
+            closeSocket(sock);
             return false;
         }
         
         // If lost the challenge, then drop the new connection
         if (sid > self.getId()) {
-            try {
-                LOG.info("Have smaller server identifier, so dropping the 
connection: (" + 
-                        sid + ", " + self.getId() + ")");
-                s.socket().close();
-            } catch (IOException e) {
-                LOG.warn("Ignoring exception when closing socket or trying to "
-                        + "reopen connection: ", e);
-            }
-        // Otherwise proceed with the connection
-        } else {    
-            SendWorker sw = new SendWorker(s, sid);
-            RecvWorker rw = new RecvWorker(s, sid);
+            LOG.info("Have smaller server identifier, so dropping the " +
+                     "connection: (" + sid + ", " + self.getId() + ")");
+            closeSocket(sock);
+            // Otherwise proceed with the connection
+        } else {
+            SendWorker sw = new SendWorker(sock, sid);
+            RecvWorker rw = new RecvWorker(sock, sid);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -215,19 +209,14 @@ public class QuorumCnxManager {
      * possible long value to lose the challenge.
      * 
      */
-    boolean receiveConnection(SocketChannel s) {
+    public boolean receiveConnection(Socket sock) {
         Long sid = null;
         
         try {
-            byte[] msgBytes = new byte[8];
-            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-                
-            s.read(msgBuffer);
-            msgBuffer.position(0);
-                
             // Read server id
-            sid = Long.valueOf(msgBuffer.getLong());
-            if(sid == QuorumPeer.OBSERVER_ID){
+            DataInputStream din = new DataInputStream(sock.getInputStream());
+            sid = din.readLong();
+            if (sid == QuorumPeer.OBSERVER_ID) {
                 /*
                  * Choose identifier at random. We need a value to identify
                  * the connection.
@@ -237,38 +226,34 @@ public class QuorumCnxManager {
                 LOG.info("Setting arbitrary identifier to observer: " + sid);
             }
         } catch (IOException e) {
-            LOG.warn("Exception reading or writing challenge: "
-                    + e.toString());
+            closeSocket(sock);
+            LOG.warn("Exception reading or writing challenge: " + 
e.toString());
             return false;
         }
         
         //If wins the challenge, then close the new connection.
         if (sid < self.getId()) {
-            try {
-                /*
-                 * This replica might still believe that the connection to sid
-                 * is up, so we have to shut down the workers before trying to
-                 * open a new connection.
-                 */
-                SendWorker sw = senderWorkerMap.get(sid);
-                if(sw != null)
-                    sw.finish();
-                
-                /*
-                 * Now we start a new connection
-                 */
-                LOG.debug("Create new connection to server: " + sid);
-                s.socket().close();
-                connectOne(sid);
-                
-            } catch (IOException e) {
-                LOG.info("Error when closing socket or trying to reopen 
connection: "
-                                + e.toString());
+            /*
+             * This replica might still believe that the connection to sid is
+             * up, so we have to shut down the workers before trying to open a
+             * new connection.
+             */
+            SendWorker sw = senderWorkerMap.get(sid);
+            if (sw != null) {
+                sw.finish();
             }
-        //Otherwise start worker threads to receive data.
+
+            /*
+             * Now we start a new connection
+             */
+            LOG.debug("Create new connection to server: " + sid);
+            closeSocket(sock);
+            connectOne(sid);
+
+            // Otherwise start worker threads to receive data.
         } else {
-            SendWorker sw = new SendWorker(s, sid);
-            RecvWorker rw = new RecvWorker(s, sid);
+            SendWorker sw = new SendWorker(sock, sid);
+            RecvWorker rw = new RecvWorker(sock, sid);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -306,10 +291,10 @@ public class QuorumCnxManager {
             } catch (InterruptedException e) {
                 LOG.warn("Exception when loopbacking", e);
             }
-        /*
-         * Otherwise send to the corresponding thread to send. 
-         */
-        } else
+            /*
+             * Otherwise send to the corresponding thread to send.
+             */
+        } else {
             try {
                 /*
                  * Start a new connection if doesn't have one already.
@@ -330,14 +315,15 @@ public class QuorumCnxManager {
                     } else {
                         LOG.error("No queue for server " + sid);
                     }
-                } 
-                
+                }
+
                 connectOne(sid);
                 
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while waiting to put message in queue.",
                         e);
-            } 
+            }
+        }
     }
     
     /**
@@ -349,31 +335,31 @@ public class QuorumCnxManager {
     synchronized void connectOne(long sid){
         if (senderWorkerMap.get(sid) == null){
             InetSocketAddress electionAddr;
-            if(self.quorumPeers.containsKey(sid))
-                electionAddr =
-                    self.quorumPeers.get(sid).electionAddr;
-            else{
+            if (self.quorumPeers.containsKey(sid)) {
+                electionAddr = self.quorumPeers.get(sid).electionAddr;
+            } else {
                 LOG.warn("Invalid server id: " + sid);
                 return;
             }
             try {
-                SocketChannel channel;
-                if(LOG.isDebugEnabled()){
-                    LOG.debug("Opening channel to server "  + sid);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Opening channel to server " + sid);
                 }
-                
-                channel = SocketChannel.open();
-                channel.socket().connect(self.getView().get(sid).electionAddr, 
cnxTO);                
-                channel.socket().setTcpNoDelay(true);
-                initiateConnection(channel, sid);
+                Socket sock = new Socket();
+                setSockOpts(sock);
+                sock.connect(self.getView().get(sid).electionAddr, cnxTO);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Connected to server " + sid);
+                }
+                initiateConnection(sock, sid);
             } catch (UnresolvedAddressException e) {
                 // Sun doesn't include the address that causes this
                 // exception to be thrown, also UAE cannot be wrapped cleanly
                 // so we log the exception in order to capture this critical
                 // detail.
                 LOG.warn("Cannot open channel to " + sid
-                        + " at election address " + electionAddr,
-                        e);
+                        + " at election address " + electionAddr, e);
                 throw e;
             } catch (IOException e) {
                 LOG.warn("Cannot open channel to " + sid
@@ -407,8 +393,9 @@ public class QuorumCnxManager {
     boolean haveDelivered() {
         for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
             LOG.debug("Queue size: " + queue.size());
-            if (queue.size() == 0)
+            if (queue.size() == 0) {
                 return true;
+            }
         }
 
         return false;
@@ -428,11 +415,36 @@ public class QuorumCnxManager {
     /**
      * A soft halt simply finishes workers.
      */
-    public void softHalt(){
-       for(SendWorker sw: senderWorkerMap.values()){
-               LOG.debug("Halting sender: " + sw);
-               sw.finish();
-       }       
+    public void softHalt() {
+        for (SendWorker sw : senderWorkerMap.values()) {
+            LOG.debug("Halting sender: " + sw);
+            sw.finish();
+        }
+    }
+
+    /**
+     * Helper method to set socket options.
+     * 
+     * @param sock
+     *            Reference to socket
+     */
+    private void setSockOpts(Socket sock) throws SocketException {
+        sock.setTcpNoDelay(true);
+        sock.setSoTimeout(self.tickTime * self.syncLimit);
+    }
+
+    /**
+     * Helper method to close a socket.
+     * 
+     * @param sock
+     *            Reference to socket
+     */
+    private void closeSocket(Socket sock) {
+        try {
+            sock.close();
+        } catch (IOException ie) {
+            LOG.error("Exception while closing", ie);
+        }
     }
 
     /**
@@ -440,7 +452,8 @@ public class QuorumCnxManager {
      */
     public class Listener extends Thread {
 
-        volatile ServerSocketChannel ss = null;
+        volatile ServerSocket ss = null;
+
         /**
          * Sleeps on accept().
          */
@@ -449,37 +462,44 @@ public class QuorumCnxManager {
             int numRetries = 0;
             while((!shutdown) && (numRetries < 3)){
                 try {
-                    ss = ServerSocketChannel.open();
-                    int port = 
self.quorumPeers.get(self.getId()).electionAddr.getPort();
-                    ss.socket().setReuseAddress(true);
+                    ss = new ServerSocket();
+                    ss.setReuseAddress(true);
+                    int port = self.quorumPeers.get(self.getId()).electionAddr
+                            .getPort();
                     InetSocketAddress addr = new InetSocketAddress(port);
                     LOG.info("My election bind port: " + addr.toString());
-                    setName(addr.toString());
-                    ss.socket().bind(addr);
-
+                    setName(self.quorumPeers.get(self.getId()).electionAddr
+                            .toString());
+                    ss.bind(addr);
                     while (!shutdown) {
-                        SocketChannel client = ss.accept();
-                        Socket sock = client.socket();
-                        sock.setTcpNoDelay(true);
-                    
-                        LOG.debug("Connection request "
-                                + sock.getRemoteSocketAddress());
-                    
-                        LOG.debug("Connection request: " + self.getId());
+                        Socket client = ss.accept();
+                        setSockOpts(client);
+                        LOG.info("Received connection request "
+                                + client.getRemoteSocketAddress());
                         receiveConnection(client);
                         numRetries = 0;
                     }
                 } catch (IOException e) {
                     LOG.error("Exception while listening", e);
                     numRetries++;
+                    try {
+                        ss.close();
+                        Thread.sleep(1000);
+                    } catch (IOException ie) {
+                        LOG.error("Error closing server socket", ie);
+                    } catch (InterruptedException ie) {
+                        LOG.error("Interrupted while sleeping. " +
+                                  "Ignoring exception", ie);
+                    }
                 }
             }
             LOG.info("Leaving listener");
-            if(!shutdown)
-                LOG.fatal("As I'm leaving the listener thread, " +
-                               "I won't be able to participate in leader " +
-                               "election any longer: " + 
-                               
self.quorumPeers.get(self.getId()).electionAddr);
+            if (!shutdown) {
+                LOG.fatal("As I'm leaving the listener thread, "
+                        + "I won't be able to participate in leader "
+                        + "election any longer: "
+                        + self.quorumPeers.get(self.getId()).electionAddr);
+            }
         }
         
         /**
@@ -505,22 +525,31 @@ public class QuorumCnxManager {
      */
     class SendWorker extends Thread {
         Long sid;
-        SocketChannel channel;
+        Socket sock;
         RecvWorker recvWorker;
         volatile boolean running = true;
+        DataOutputStream dout;
 
         /**
          * An instance of this thread receives messages to send
          * through a queue and sends them to the server sid.
          * 
-         * @param channel SocketChannel
-         * @param sid   Server identifier
+         * @param sock
+         *            Socket to remote peer
+         * @param sid
+         *            Server identifier of remote peer
          */
-        SendWorker(SocketChannel channel, Long sid) {
+        SendWorker(Socket sock, Long sid) {
             this.sid = sid;
-            this.channel = channel;
+            this.sock = sock;
             recvWorker = null;
-            
+            try {
+                dout = new DataOutputStream(sock.getOutputStream());
+            } catch (IOException e) {
+                LOG.error("Unable to access socket output stream", e);
+                closeSocket(sock);
+                running = false;
+            }
             LOG.debug("Address of remote peer: " + this.sid);
         }
 
@@ -538,8 +567,8 @@ public class QuorumCnxManager {
         }
                 
         synchronized boolean finish() {
-            if(LOG.isDebugEnabled()){
-                LOG.debug("Calling finish");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Calling finish for " + sid);
             }
             
             if(!running){
@@ -550,59 +579,59 @@ public class QuorumCnxManager {
             }
             
             running = false;
-            
-            try{
-                channel.close();
-            } catch (IOException e) {
-                LOG.warn("Exception while closing socket");
-            }
-            //channel = null;
+            closeSocket(sock);
+            // channel = null;
 
             this.interrupt();
-            if (recvWorker != null)
+            if (recvWorker != null) {
                 recvWorker.finish();
-            
-            if(LOG.isDebugEnabled()){
+            }
+
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
             }
-            senderWorkerMap.remove(sid);
+            senderWorkerMap.remove(sid, this);
             return running;
         }
         
         synchronized void send(ByteBuffer b) throws IOException {
-            byte[] msgBytes = new byte[b.capacity()
-                                       + (Integer.SIZE / 8)];
-            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-            msgBuffer.putInt(b.capacity());
-
-            msgBuffer.put(b.array(), 0, b.capacity());
-            msgBuffer.position(0);
-            if(channel != null)
-                channel.write(msgBuffer);
-            else
-                throw new IOException("SocketChannel is null");
+            byte[] msgBytes = new byte[b.capacity()];
+            try {
+                b.position(0);
+                b.get(msgBytes);
+            } catch (BufferUnderflowException be) {
+                LOG.fatal("BufferUnderflowException ", be);
+                return;
+            }
+            dout.writeInt(b.capacity());
+            dout.write(b.array());
+            dout.flush();
         }
 
         @Override
         public void run() {
-            try{
-                ByteBuffer b = lastMessageSent.get(sid); 
-                if(b != null) send(b);   
+            try {
+                ByteBuffer b = lastMessageSent.get(sid);
+                if (b != null) {
+                    send(b);
+                }
             } catch (IOException e) {
                 LOG.error("Failed to send last message. Shutting down 
thread.", e);
                 this.finish();
             }
             
             try {
-                while (running && !shutdown && channel != null) {
+                while (running && !shutdown && sock != null) {
 
                     ByteBuffer b = null;
                     try {
-                        ArrayBlockingQueue<ByteBuffer> bq = 
queueSendMap.get(sid);                    
-                        if(bq != null) 
+                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
+                                .get(sid);
+                        if (bq != null) {
                             b = bq.poll(1000, TimeUnit.MILLISECONDS);
-                        else {
-                            LOG.error("No queue of incoming messages for 
server " + sid);
+                        } else {
+                            LOG.error("No queue of incoming messages for " +
+                                      "server " + sid);
                             break;
                         }
 
@@ -630,12 +659,22 @@ public class QuorumCnxManager {
      */
     class RecvWorker extends Thread {
         Long sid;
-        SocketChannel channel;
+        Socket sock;
         volatile boolean running = true;
+        DataInputStream din;
 
-        RecvWorker(SocketChannel channel, Long sid) {
+        RecvWorker(Socket sock, Long sid) {
             this.sid = sid;
-            this.channel = channel;
+            this.sock = sock;
+            try {
+                din = new DataInputStream(sock.getInputStream());
+                // OK to wait until socket disconnects while reading.
+                sock.setSoTimeout(0);
+            } catch (IOException e) {
+                LOG.error("Error while accessing socket for " + sid, e);
+                closeSocket(sock);
+                running = false;
+            }
         }
         
         /**
@@ -660,54 +699,33 @@ public class QuorumCnxManager {
         public void run() {
             try {
                 byte[] size = new byte[4];
-                ByteBuffer msgLength = ByteBuffer.wrap(size);
-                while (running && !shutdown && channel != null) {
+                while (running && !shutdown && sock != null) {
                     /**
                      * Reads the first int to determine the length of the
                      * message
                      */
-                    while (msgLength.hasRemaining()) {
-                        if (channel.read(msgLength) < 0) {
-                            throw new IOException("Channel eof");
-                        }
-                    }
-                    msgLength.position(0);
-                    int length = msgLength.getInt();
-                    if(length <= 0) {
-                        throw new IOException("Invalid packet length:" + 
length);
+                    int length = din.readInt();
+                    if (length <= 0 || length > PACKETMAXSIZE) {
+                        throw new IOException(
+                                "Received packet with invalid packet: "
+                                        + length);
                     }
                     /**
                      * Allocates a new ByteBuffer to receive the message
                      */
-                    if (length > PACKETMAXSIZE) {
-                        throw new IOException("Invalid packet of length " + 
length);
-                    }
                     byte[] msgArray = new byte[length];
+                    din.readFully(msgArray, 0, length);
                     ByteBuffer message = ByteBuffer.wrap(msgArray);
-                    int numbytes = 0;
-                    int temp_numbytes = 0;
-                    while (message.hasRemaining()) {
-                        temp_numbytes = channel.read(message); 
-                        if(temp_numbytes < 0) {
-                            throw new IOException("Channel eof before end");
-                        }
-                        numbytes += temp_numbytes;
-                    }
-                    message.position(0);
                     synchronized (recvQueue) {
-                        recvQueue
-                        .put(new Message(message.duplicate(), sid));
+                        recvQueue.put(new Message(message.duplicate(), sid));
                     }
-                    msgLength.position(0);
                 }
             } catch (Exception e) {
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 
                         self.getId() + ", error = " + e);
             } finally {
-                try{
-                    channel.socket().close();
-                } catch (IOException e) {
-                    LOG.warn("Exception while trying to close channel");
+                if (sock != null) {
+                    closeSocket(sock);
                 }
             }
         }

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1036071&r1=1036070&r2=1036071&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
 Wed Nov 17 14:59:29 2010
@@ -22,9 +22,11 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+import java.net.Socket;
 import java.util.HashMap;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.io.*;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.PortAssignment;
@@ -165,6 +167,7 @@ public class CnxManagerTest extends ZKTe
         }
         
     }
+
     @Test
     public void testCnxManagerTimeout() throws Exception {
         Random rand = new Random();
@@ -256,4 +259,31 @@ public class CnxManagerTest extends ZKTe
             LOG.info("Socket has been closed as expected");
         }
     }
-}
\ No newline at end of file
+
+    /*
+     * Test if a receiveConnection is able to timeout on socket errors
+     */
+    @Test
+    public void testSocketTimeout() throws Exception {
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 
3, 1, 2000, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        InetSocketAddress addr = new InetSocketAddress(port);
+        Thread.sleep(1000);
+        
+        Socket sock = new Socket();
+        sock.connect(peers.get(new Long(1)).electionAddr, 5000);
+        long begin = System.currentTimeMillis();
+        // Read without sending data. Verify timeout.
+        cnxManager.receiveConnection(sock);
+        long end = System.currentTimeMillis();
+        if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) 
Assert.fail("Waited more than necessary");
+    }
+}


Reply via email to