Author: fpj Date: Wed Nov 17 14:59:29 2010 New Revision: 1036071 URL: http://svn.apache.org/viewvc?rev=1036071&view=rev Log: ZOOKEEPER-900. FLE implementation should be improved to use non-blocking sockets (vishal via fpj)
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1036071&r1=1036070&r2=1036071&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Wed Nov 17 14:59:29 2010 @@ -153,6 +153,8 @@ BUGFIXES: ZOOKEEPER-930. Hedwig c++ client uses a non thread safe logging library (ivan via breed) + ZOOKEEPER-900. FLE implementation should be improved to use non-blocking sockets (vishal via fpj) + IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information (phunt via mahadev) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=1036071&r1=1036070&r2=1036071&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java Wed Nov 17 14:59:29 2010 @@ -41,7 +41,7 @@ import org.apache.zookeeper.server.quoru public class LeaderElection implements Election { private static final Logger LOG = Logger.getLogger(LeaderElection.class); - protected static Random epochGen = new Random(); + protected static final Random epochGen = new Random(); protected QuorumPeer self; Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1036071&r1=1036070&r2=1036071&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Nov 17 14:59:29 2010 @@ -18,15 +18,17 @@ package org.apache.zookeeper.server.quorum; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketException; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import java.util.Enumeration; -import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -141,49 +143,41 @@ public class QuorumCnxManager { * @param sid */ public void testInitiateConnection(long sid) throws Exception { - SocketChannel channel; - if(LOG.isDebugEnabled()){ - LOG.debug("Opening channel to server " + sid); + if (LOG.isDebugEnabled()) { + LOG.debug("Opening channel to server " + sid); } - - channel = SocketChannel.open(); - channel.socket().connect(self.getVotingView().get(sid).electionAddr, cnxTO); - channel.socket().setTcpNoDelay(true); - initiateConnection(channel, sid); + Socket sock = new Socket(); + setSockOpts(sock); + sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO); + initiateConnection(sock, sid); } /** * If this server has initiated the connection, then it gives up on the * connection if it loses challenge. Otherwise, it keeps the connection. */ - - public boolean initiateConnection(SocketChannel s, Long sid) { - try { + public boolean initiateConnection(Socket sock, Long sid) { + DataOutputStream dout = null; + try { // Sending id and challenge - byte[] msgBytes = new byte[8]; - ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes); - msgBuffer.putLong(self.getId()); - msgBuffer.position(0); - s.write(msgBuffer); + dout = new DataOutputStream(sock.getOutputStream()); + dout.writeLong(self.getId()); + dout.flush(); } catch (IOException e) { - LOG.warn("Exception reading or writing challenge: ", e); + LOG.warn("Ignoring exception reading or writing challenge: ", e); + closeSocket(sock); return false; } // If lost the challenge, then drop the new connection if (sid > self.getId()) { - try { - LOG.info("Have smaller server identifier, so dropping the connection: (" + - sid + ", " + self.getId() + ")"); - s.socket().close(); - } catch (IOException e) { - LOG.warn("Ignoring exception when closing socket or trying to " - + "reopen connection: ", e); - } - // Otherwise proceed with the connection - } else { - SendWorker sw = new SendWorker(s, sid); - RecvWorker rw = new RecvWorker(s, sid); + LOG.info("Have smaller server identifier, so dropping the " + + "connection: (" + sid + ", " + self.getId() + ")"); + closeSocket(sock); + // Otherwise proceed with the connection + } else { + SendWorker sw = new SendWorker(sock, sid); + RecvWorker rw = new RecvWorker(sock, sid); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); @@ -215,19 +209,14 @@ public class QuorumCnxManager { * possible long value to lose the challenge. * */ - boolean receiveConnection(SocketChannel s) { + public boolean receiveConnection(Socket sock) { Long sid = null; try { - byte[] msgBytes = new byte[8]; - ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes); - - s.read(msgBuffer); - msgBuffer.position(0); - // Read server id - sid = Long.valueOf(msgBuffer.getLong()); - if(sid == QuorumPeer.OBSERVER_ID){ + DataInputStream din = new DataInputStream(sock.getInputStream()); + sid = din.readLong(); + if (sid == QuorumPeer.OBSERVER_ID) { /* * Choose identifier at random. We need a value to identify * the connection. @@ -237,38 +226,34 @@ public class QuorumCnxManager { LOG.info("Setting arbitrary identifier to observer: " + sid); } } catch (IOException e) { - LOG.warn("Exception reading or writing challenge: " - + e.toString()); + closeSocket(sock); + LOG.warn("Exception reading or writing challenge: " + e.toString()); return false; } //If wins the challenge, then close the new connection. if (sid < self.getId()) { - try { - /* - * This replica might still believe that the connection to sid - * is up, so we have to shut down the workers before trying to - * open a new connection. - */ - SendWorker sw = senderWorkerMap.get(sid); - if(sw != null) - sw.finish(); - - /* - * Now we start a new connection - */ - LOG.debug("Create new connection to server: " + sid); - s.socket().close(); - connectOne(sid); - - } catch (IOException e) { - LOG.info("Error when closing socket or trying to reopen connection: " - + e.toString()); + /* + * This replica might still believe that the connection to sid is + * up, so we have to shut down the workers before trying to open a + * new connection. + */ + SendWorker sw = senderWorkerMap.get(sid); + if (sw != null) { + sw.finish(); } - //Otherwise start worker threads to receive data. + + /* + * Now we start a new connection + */ + LOG.debug("Create new connection to server: " + sid); + closeSocket(sock); + connectOne(sid); + + // Otherwise start worker threads to receive data. } else { - SendWorker sw = new SendWorker(s, sid); - RecvWorker rw = new RecvWorker(s, sid); + SendWorker sw = new SendWorker(sock, sid); + RecvWorker rw = new RecvWorker(sock, sid); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); @@ -306,10 +291,10 @@ public class QuorumCnxManager { } catch (InterruptedException e) { LOG.warn("Exception when loopbacking", e); } - /* - * Otherwise send to the corresponding thread to send. - */ - } else + /* + * Otherwise send to the corresponding thread to send. + */ + } else { try { /* * Start a new connection if doesn't have one already. @@ -330,14 +315,15 @@ public class QuorumCnxManager { } else { LOG.error("No queue for server " + sid); } - } - + } + connectOne(sid); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting to put message in queue.", e); - } + } + } } /** @@ -349,31 +335,31 @@ public class QuorumCnxManager { synchronized void connectOne(long sid){ if (senderWorkerMap.get(sid) == null){ InetSocketAddress electionAddr; - if(self.quorumPeers.containsKey(sid)) - electionAddr = - self.quorumPeers.get(sid).electionAddr; - else{ + if (self.quorumPeers.containsKey(sid)) { + electionAddr = self.quorumPeers.get(sid).electionAddr; + } else { LOG.warn("Invalid server id: " + sid); return; } try { - SocketChannel channel; - if(LOG.isDebugEnabled()){ - LOG.debug("Opening channel to server " + sid); + + if (LOG.isDebugEnabled()) { + LOG.debug("Opening channel to server " + sid); } - - channel = SocketChannel.open(); - channel.socket().connect(self.getView().get(sid).electionAddr, cnxTO); - channel.socket().setTcpNoDelay(true); - initiateConnection(channel, sid); + Socket sock = new Socket(); + setSockOpts(sock); + sock.connect(self.getView().get(sid).electionAddr, cnxTO); + if (LOG.isDebugEnabled()) { + LOG.debug("Connected to server " + sid); + } + initiateConnection(sock, sid); } catch (UnresolvedAddressException e) { // Sun doesn't include the address that causes this // exception to be thrown, also UAE cannot be wrapped cleanly // so we log the exception in order to capture this critical // detail. LOG.warn("Cannot open channel to " + sid - + " at election address " + electionAddr, - e); + + " at election address " + electionAddr, e); throw e; } catch (IOException e) { LOG.warn("Cannot open channel to " + sid @@ -407,8 +393,9 @@ public class QuorumCnxManager { boolean haveDelivered() { for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) { LOG.debug("Queue size: " + queue.size()); - if (queue.size() == 0) + if (queue.size() == 0) { return true; + } } return false; @@ -428,11 +415,36 @@ public class QuorumCnxManager { /** * A soft halt simply finishes workers. */ - public void softHalt(){ - for(SendWorker sw: senderWorkerMap.values()){ - LOG.debug("Halting sender: " + sw); - sw.finish(); - } + public void softHalt() { + for (SendWorker sw : senderWorkerMap.values()) { + LOG.debug("Halting sender: " + sw); + sw.finish(); + } + } + + /** + * Helper method to set socket options. + * + * @param sock + * Reference to socket + */ + private void setSockOpts(Socket sock) throws SocketException { + sock.setTcpNoDelay(true); + sock.setSoTimeout(self.tickTime * self.syncLimit); + } + + /** + * Helper method to close a socket. + * + * @param sock + * Reference to socket + */ + private void closeSocket(Socket sock) { + try { + sock.close(); + } catch (IOException ie) { + LOG.error("Exception while closing", ie); + } } /** @@ -440,7 +452,8 @@ public class QuorumCnxManager { */ public class Listener extends Thread { - volatile ServerSocketChannel ss = null; + volatile ServerSocket ss = null; + /** * Sleeps on accept(). */ @@ -449,37 +462,44 @@ public class QuorumCnxManager { int numRetries = 0; while((!shutdown) && (numRetries < 3)){ try { - ss = ServerSocketChannel.open(); - int port = self.quorumPeers.get(self.getId()).electionAddr.getPort(); - ss.socket().setReuseAddress(true); + ss = new ServerSocket(); + ss.setReuseAddress(true); + int port = self.quorumPeers.get(self.getId()).electionAddr + .getPort(); InetSocketAddress addr = new InetSocketAddress(port); LOG.info("My election bind port: " + addr.toString()); - setName(addr.toString()); - ss.socket().bind(addr); - + setName(self.quorumPeers.get(self.getId()).electionAddr + .toString()); + ss.bind(addr); while (!shutdown) { - SocketChannel client = ss.accept(); - Socket sock = client.socket(); - sock.setTcpNoDelay(true); - - LOG.debug("Connection request " - + sock.getRemoteSocketAddress()); - - LOG.debug("Connection request: " + self.getId()); + Socket client = ss.accept(); + setSockOpts(client); + LOG.info("Received connection request " + + client.getRemoteSocketAddress()); receiveConnection(client); numRetries = 0; } } catch (IOException e) { LOG.error("Exception while listening", e); numRetries++; + try { + ss.close(); + Thread.sleep(1000); + } catch (IOException ie) { + LOG.error("Error closing server socket", ie); + } catch (InterruptedException ie) { + LOG.error("Interrupted while sleeping. " + + "Ignoring exception", ie); + } } } LOG.info("Leaving listener"); - if(!shutdown) - LOG.fatal("As I'm leaving the listener thread, " + - "I won't be able to participate in leader " + - "election any longer: " + - self.quorumPeers.get(self.getId()).electionAddr); + if (!shutdown) { + LOG.fatal("As I'm leaving the listener thread, " + + "I won't be able to participate in leader " + + "election any longer: " + + self.quorumPeers.get(self.getId()).electionAddr); + } } /** @@ -505,22 +525,31 @@ public class QuorumCnxManager { */ class SendWorker extends Thread { Long sid; - SocketChannel channel; + Socket sock; RecvWorker recvWorker; volatile boolean running = true; + DataOutputStream dout; /** * An instance of this thread receives messages to send * through a queue and sends them to the server sid. * - * @param channel SocketChannel - * @param sid Server identifier + * @param sock + * Socket to remote peer + * @param sid + * Server identifier of remote peer */ - SendWorker(SocketChannel channel, Long sid) { + SendWorker(Socket sock, Long sid) { this.sid = sid; - this.channel = channel; + this.sock = sock; recvWorker = null; - + try { + dout = new DataOutputStream(sock.getOutputStream()); + } catch (IOException e) { + LOG.error("Unable to access socket output stream", e); + closeSocket(sock); + running = false; + } LOG.debug("Address of remote peer: " + this.sid); } @@ -538,8 +567,8 @@ public class QuorumCnxManager { } synchronized boolean finish() { - if(LOG.isDebugEnabled()){ - LOG.debug("Calling finish"); + if (LOG.isDebugEnabled()) { + LOG.debug("Calling finish for " + sid); } if(!running){ @@ -550,59 +579,59 @@ public class QuorumCnxManager { } running = false; - - try{ - channel.close(); - } catch (IOException e) { - LOG.warn("Exception while closing socket"); - } - //channel = null; + closeSocket(sock); + // channel = null; this.interrupt(); - if (recvWorker != null) + if (recvWorker != null) { recvWorker.finish(); - - if(LOG.isDebugEnabled()){ + } + + if (LOG.isDebugEnabled()) { LOG.debug("Removing entry from senderWorkerMap sid=" + sid); } - senderWorkerMap.remove(sid); + senderWorkerMap.remove(sid, this); return running; } synchronized void send(ByteBuffer b) throws IOException { - byte[] msgBytes = new byte[b.capacity() - + (Integer.SIZE / 8)]; - ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes); - msgBuffer.putInt(b.capacity()); - - msgBuffer.put(b.array(), 0, b.capacity()); - msgBuffer.position(0); - if(channel != null) - channel.write(msgBuffer); - else - throw new IOException("SocketChannel is null"); + byte[] msgBytes = new byte[b.capacity()]; + try { + b.position(0); + b.get(msgBytes); + } catch (BufferUnderflowException be) { + LOG.fatal("BufferUnderflowException ", be); + return; + } + dout.writeInt(b.capacity()); + dout.write(b.array()); + dout.flush(); } @Override public void run() { - try{ - ByteBuffer b = lastMessageSent.get(sid); - if(b != null) send(b); + try { + ByteBuffer b = lastMessageSent.get(sid); + if (b != null) { + send(b); + } } catch (IOException e) { LOG.error("Failed to send last message. Shutting down thread.", e); this.finish(); } try { - while (running && !shutdown && channel != null) { + while (running && !shutdown && sock != null) { ByteBuffer b = null; try { - ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); - if(bq != null) + ArrayBlockingQueue<ByteBuffer> bq = queueSendMap + .get(sid); + if (bq != null) { b = bq.poll(1000, TimeUnit.MILLISECONDS); - else { - LOG.error("No queue of incoming messages for server " + sid); + } else { + LOG.error("No queue of incoming messages for " + + "server " + sid); break; } @@ -630,12 +659,22 @@ public class QuorumCnxManager { */ class RecvWorker extends Thread { Long sid; - SocketChannel channel; + Socket sock; volatile boolean running = true; + DataInputStream din; - RecvWorker(SocketChannel channel, Long sid) { + RecvWorker(Socket sock, Long sid) { this.sid = sid; - this.channel = channel; + this.sock = sock; + try { + din = new DataInputStream(sock.getInputStream()); + // OK to wait until socket disconnects while reading. + sock.setSoTimeout(0); + } catch (IOException e) { + LOG.error("Error while accessing socket for " + sid, e); + closeSocket(sock); + running = false; + } } /** @@ -660,54 +699,33 @@ public class QuorumCnxManager { public void run() { try { byte[] size = new byte[4]; - ByteBuffer msgLength = ByteBuffer.wrap(size); - while (running && !shutdown && channel != null) { + while (running && !shutdown && sock != null) { /** * Reads the first int to determine the length of the * message */ - while (msgLength.hasRemaining()) { - if (channel.read(msgLength) < 0) { - throw new IOException("Channel eof"); - } - } - msgLength.position(0); - int length = msgLength.getInt(); - if(length <= 0) { - throw new IOException("Invalid packet length:" + length); + int length = din.readInt(); + if (length <= 0 || length > PACKETMAXSIZE) { + throw new IOException( + "Received packet with invalid packet: " + + length); } /** * Allocates a new ByteBuffer to receive the message */ - if (length > PACKETMAXSIZE) { - throw new IOException("Invalid packet of length " + length); - } byte[] msgArray = new byte[length]; + din.readFully(msgArray, 0, length); ByteBuffer message = ByteBuffer.wrap(msgArray); - int numbytes = 0; - int temp_numbytes = 0; - while (message.hasRemaining()) { - temp_numbytes = channel.read(message); - if(temp_numbytes < 0) { - throw new IOException("Channel eof before end"); - } - numbytes += temp_numbytes; - } - message.position(0); synchronized (recvQueue) { - recvQueue - .put(new Message(message.duplicate(), sid)); + recvQueue.put(new Message(message.duplicate(), sid)); } - msgLength.position(0); } } catch (Exception e) { LOG.warn("Connection broken for id " + sid + ", my id = " + self.getId() + ", error = " + e); } finally { - try{ - channel.socket().close(); - } catch (IOException e) { - LOG.warn("Exception while trying to close channel"); + if (sock != null) { + closeSocket(sock); } } } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1036071&r1=1036070&r2=1036071&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Wed Nov 17 14:59:29 2010 @@ -22,9 +22,11 @@ import java.io.File; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.net.Socket; import java.util.HashMap; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.io.*; import org.apache.log4j.Logger; import org.apache.zookeeper.PortAssignment; @@ -165,6 +167,7 @@ public class CnxManagerTest extends ZKTe } } + @Test public void testCnxManagerTimeout() throws Exception { Random rand = new Random(); @@ -256,4 +259,31 @@ public class CnxManagerTest extends ZKTe LOG.info("Socket has been closed as expected"); } } -} \ No newline at end of file + + /* + * Test if a receiveConnection is able to timeout on socket errors + */ + @Test + public void testSocketTimeout() throws Exception { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2000, 2, 2); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + int port = peers.get(peer.getId()).electionAddr.getPort(); + LOG.info("Election port: " + port); + InetSocketAddress addr = new InetSocketAddress(port); + Thread.sleep(1000); + + Socket sock = new Socket(); + sock.connect(peers.get(new Long(1)).electionAddr, 5000); + long begin = System.currentTimeMillis(); + // Read without sending data. Verify timeout. + cnxManager.receiveConnection(sock); + long end = System.currentTimeMillis(); + if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited more than necessary"); + } +}