Author: mahadev Date: Tue May 4 21:45:13 2010 New Revision: 941056 URL: http://svn.apache.org/viewvc?rev=941056&view=rev Log: ZOOKEEPER-737. some 4 letter words may fail with netcat (nc). (mahadev)
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=941056&r1=941055&r2=941056&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Tue May 4 21:45:13 2010 @@ -44,6 +44,8 @@ BUGFIXES: ZOOKEEPER-758. zkpython segfaults on invalid acl with missing key (Kapil Thangavelu via henryr) + ZOOKEEPER-737. some 4 letter words may fail with netcat (nc). (mahadev) + IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information (phunt via mahadev) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=941056&r1=941055&r2=941056&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Tue May 4 21:45:13 2010 @@ -39,7 +39,6 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; @@ -386,6 +385,29 @@ public class NIOServerCnxn implements Wa sendBuffer(closeConn); } + /** + * send buffer without using the asynchronous + * calls to selector and then close the socket + * @param bb + */ + void sendBufferSync(ByteBuffer bb) { + try { + /* configure socket to be blocking + * so that we dont have to do write in + * a tight while loop + */ + sock.configureBlocking(true); + if (bb != closeConn) { + if (sock != null) { + sock.write(bb); + } + packetSent(); + } + } catch (IOException ie) { + LOG.error("Error sending data synchronously ", ie); + } + } + void sendBuffer(ByteBuffer bb) { try { if (bb != closeConn) { @@ -497,6 +519,11 @@ public class NIOServerCnxn implements Wa if (isPayload) { // not the case for 4letterword readPayload(); } + else { + // four letter words take care + // need not do anything else + return; + } } } if (k.isWritable()) { @@ -886,6 +913,30 @@ public class NIOServerCnxn implements Wa } /** + * clean up the socket related to a command and also make sure we flush the + * data before we do that + * + * @param pwriter + * the pwriter for a command socket + */ + private void cleanupWriterSocket(PrintWriter pwriter) { + try { + if (pwriter != null) { + pwriter.flush(); + pwriter.close(); + } + } catch (Exception e) { + LOG.info("Error closing PrintWriter ", e); + } finally { + try { + close(); + } catch (Exception e) { + LOG.error("Error closing a command socket ", e); + } + } + } + + /** * This class wraps the sendBuffer method of NIOServerCnxn. It is * responsible for chunking up the response to a client. Rather * than cons'ing up a response fully in memory, which may be large @@ -893,50 +944,23 @@ public class NIOServerCnxn implements Wa */ private class SendBufferWriter extends Writer { private StringBuffer sb = new StringBuffer(); - - /* FYI: clearing the READ interestOps on the key results in - * the cnxn being closed in doIO. - */ - + /** * Check if we are ready to send another chunk. * @param force force sending, even if not a full chunk */ private void checkFlush(boolean force) { if ((force && sb.length() > 0) || sb.length() > 2048) { - sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); - // including op_read keeps doio from closing the conn - wakeup(SelectionKey.OP_READ - | SelectionKey.OP_WRITE); - + sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes())); // clear our internal buffer sb.setLength(0); } } - /** - * Wakeup the selector. This is necessary as the cnxn is - * waiting for interestOps to be satisfied. If we want the - * selector to wakeup immediately (rather than the last - * select(timeout) period) we need to force a wakeup. - * @param sel the new interest ops - */ - private void wakeup(int sel) { - synchronized(factory) { - sk.selector().wakeup(); - sk.interestOps(sel); - } - } - @Override public void close() throws IOException { if (sb == null) return; - checkFlush(true); - - // nothing left, please close - wakeup(SelectionKey.OP_WRITE); - sb = null; // clear out the ref to ensure no reuse } @@ -954,10 +978,254 @@ public class NIOServerCnxn implements Wa private static final String ZK_NOT_SERVING = "This ZooKeeper instance is not currently serving requests"; + + /** + * Set of threads for commmand ports. All the 4 + * letter commands are run via a thread. Each class + * maps to a correspoding 4 letter command. CommandThread + * is the abstract class from which all the others inherit. + */ + private abstract class CommandThread extends Thread { + PrintWriter pw; + + CommandThread(PrintWriter pw) { + this.pw = pw; + } + + public void run() { + try { + commandRun(); + } catch (IOException ie) { + LOG.error("Error in running command ", ie); + } finally { + cleanupWriterSocket(pw); + } + } + + public abstract void commandRun() throws IOException; + } + + private class RuokCommand extends CommandThread { + public RuokCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + pw.print("imok"); + + } + } + + private class TraceMaskCommand extends CommandThread { + TraceMaskCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + long traceMask = ZooTrace.getTextTraceLevel(); + pw.print(traceMask); + } + } + + private class SetTraceMaskCommand extends CommandThread { + long trace = 0; + SetTraceMaskCommand(PrintWriter pw, long trace) { + super(pw); + this.trace = trace; + } + + @Override + public void commandRun() { + pw.print(trace); + } + } + + private class EnvCommand extends CommandThread { + EnvCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + List<Environment.Entry> env = Environment.list(); + + pw.println("Environment:"); + for(Environment.Entry e : env) { + pw.print(e.getKey()); + pw.print("="); + pw.println(e.getValue()); + } + + } + } + + private class ConfCommand extends CommandThread { + ConfCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } else { + zk.dumpConf(pw); + } + } + } + + private class StatResetCommand extends CommandThread { + public StatResetCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } + else { + zk.serverStats().reset(); + pw.println("Server stats reset."); + } + } + } + + private class CnxnStatResetCommand extends CommandThread { + public CnxnStatResetCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } else { + synchronized(factory.cnxns){ + for(NIOServerCnxn c : factory.cnxns){ + c.getStats().reset(); + } + } + pw.println("Connection stats reset."); + } + } + } + + private class DumpCommand extends CommandThread { + public DumpCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } + else { + pw.println("SessionTracker dump:"); + zk.sessionTracker.dumpSessions(pw); + pw.println("ephemeral nodes dump:"); + zk.dumpEphemerals(pw); + } + } + } + + private class StatCommand extends CommandThread { + int len; + public StatCommand(PrintWriter pw, int len) { + super(pw); + this.len = len; + } + + @SuppressWarnings("unchecked") + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } + else { + pw.print("Zookeeper version: "); + pw.println(Version.getFullVersion()); + if (len == statCmd) { + LOG.info("Stat command output"); + pw.println("Clients:"); + // clone should be faster than iteration + // ie give up the cnxns lock faster + HashSet<NIOServerCnxn> cnxnset; + synchronized(factory.cnxns){ + cnxnset = (HashSet<NIOServerCnxn>)factory + .cnxns.clone(); + } + for(NIOServerCnxn c : cnxnset){ + ((CnxnStats)c.getStats()) + .dumpConnectionInfo(pw, true); + } + pw.println(); + } + pw.print(zk.serverStats().toString()); + pw.print("Node count: "); + pw.println(zk.getZKDatabase().getNodeCount()); + } + + } + } + + private class ConsCommand extends CommandThread { + public ConsCommand(PrintWriter pw) { + super(pw); + } + + @SuppressWarnings("unchecked") + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } else { + // clone should be faster than iteration + // ie give up the cnxns lock faster + HashSet<NIOServerCnxn> cnxns; + synchronized (factory.cnxns) { + cnxns = (HashSet<NIOServerCnxn>) factory.cnxns.clone(); + } + for (NIOServerCnxn c : cnxns) { + ((CnxnStats) c.getStats()).dumpConnectionInfo(pw, false); + } + pw.println(); + } + } + } + + private class WatchCommand extends CommandThread { + int len = 0; + public WatchCommand(PrintWriter pw, int len) { + super(pw); + this.len = len; + } + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } else { + DataTree dt = zk.getZKDatabase().getDataTree(); + if (len == wchsCmd) { + dt.dumpWatchesSummary(pw); + } else if (len == wchpCmd) { + dt.dumpWatches(pw, true); + } else { + dt.dumpWatches(pw, false); + } + pw.println(); + } + } + } + + /** Return if four letter word found and responded to, otw false **/ private boolean checkFourLetterWord(final SelectionKey k, final int len) - throws IOException + throws IOException { // We take advantage of the limited size of the length to look // for cmds. They are all 4-bytes which fits inside of an int @@ -969,197 +1237,77 @@ public class NIOServerCnxn implements Wa + sock.socket().getRemoteSocketAddress()); packetReceived(); + /** cancel the selection key to remove the socket handling + * from selector. This is to prevent netcat problem wherein + * netcat immediately closes the sending side after sending the + * commands and still keeps the receiving channel open. + * The idea is to remove the selectionkey from the selector + * so that the selector does not notice the closed read on the + * socket channel and keep the socket alive to write the data to + * and makes sure to close the socket after its done writing the data + */ + if (k != null) { + try { + k.cancel(); + } catch(Exception e) { + LOG.error("Error cancelling command selection key ", e); + } + } + final PrintWriter pwriter = new PrintWriter( new BufferedWriter(new SendBufferWriter())); - boolean threadWillClosePWriter = false; - try { - if (len == ruokCmd) { - pwriter.print("imok"); - return true; - } else if (len == getTraceMaskCmd) { - long traceMask = ZooTrace.getTextTraceLevel(); - pwriter.print(traceMask); - return true; - } else if (len == setTraceMaskCmd) { - int rc = sock.read(incomingBuffer); - if (rc < 0) { - throw new IOException("Read error"); - } - - incomingBuffer.flip(); - long traceMask = incomingBuffer.getLong(); - ZooTrace.setTextTraceLevel(traceMask); - pwriter.print(traceMask); - return true; - } else if (len == enviCmd) { - List<Environment.Entry> env = Environment.list(); - - pwriter.println("Environment:"); - for(Environment.Entry e : env) { - pwriter.print(e.getKey()); - pwriter.print("="); - pwriter.println(e.getValue()); - } - return true; - } else if (len == confCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - zk.dumpConf(pwriter); - return true; - } else if (len == srstCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - zk.serverStats().reset(); - pwriter.println("Server stats reset."); - return true; - } else if (len == crstCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - synchronized(factory.cnxns){ - for(NIOServerCnxn c : factory.cnxns){ - c.getStats().reset(); - } - } - pwriter.println("Connection stats reset."); - return true; - } else if (len == dumpCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - // this could be a long running task, spawn a thread so - // that we don't block the processing of other requests - threadWillClosePWriter = true; - new Thread() { - @Override - public void run() { - try { - pwriter.println("SessionTracker dump:"); - zk.sessionTracker.dumpSessions(pwriter); - pwriter.println("ephemeral nodes dump:"); - zk.dumpEphemerals(pwriter); - } finally { - pwriter.flush(); - pwriter.close(); - } - } - }.start(); - - return true; - } else if (len == statCmd || len == srvrCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - // this could be a long running task, spawn a thread so - // that we don't block the processing of other requests - threadWillClosePWriter = true; - new Thread() { - @SuppressWarnings("unchecked") - @Override - public void run() { - try { - pwriter.print("Zookeeper version: "); - pwriter.println(Version.getFullVersion()); - if (len == statCmd) { - pwriter.println("Clients:"); - // clone should be faster than iteration - // ie give up the cnxns lock faster - HashSet<NIOServerCnxn> cnxns; - synchronized(factory.cnxns){ - cnxns = (HashSet<NIOServerCnxn>)factory - .cnxns.clone(); - } - for(NIOServerCnxn c : cnxns){ - ((CnxnStats)c.getStats()) - .dumpConnectionInfo(pwriter, true); - } - pwriter.println(); - } - pwriter.print(zk.serverStats().toString()); - pwriter.print("Node count: "); - pwriter.println(zk.getZKDatabase().getNodeCount()); - } finally { - pwriter.flush(); - pwriter.close(); - } - } - }.start(); - return true; - } else if (len == consCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - // this could be a long running task, spawn a thread so - // that we don't block the processing of other requests - threadWillClosePWriter = true; - new Thread() { - @SuppressWarnings("unchecked") - @Override - public void run() { - try { - // clone should be faster than iteration - // ie give up the cnxns lock faster - HashSet<NIOServerCnxn> cnxns; - synchronized(factory.cnxns){ - cnxns = (HashSet<NIOServerCnxn>)factory - .cnxns.clone(); - } - for(NIOServerCnxn c : cnxns){ - ((CnxnStats)c.getStats()) - .dumpConnectionInfo(pwriter, false); - } - pwriter.println(); - } finally { - pwriter.flush(); - pwriter.close(); - } - } - }.start(); - return true; - } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - // this could be a long running task, spawn a thread so - // that we don't block the processing of other requests - threadWillClosePWriter = true; - new Thread() { - @Override - public void run() { - try { - DataTree dt = zk.getZKDatabase().getDataTree(); - if (len == wchsCmd) { - dt.dumpWatchesSummary(pwriter); - } else if (len == wchpCmd) { - dt.dumpWatches(pwriter, true); - } else { - dt.dumpWatches(pwriter, false); - } - pwriter.println(); - } finally { - pwriter.flush(); - pwriter.close(); - } - } - }.start(); - return true; - } - } finally { - // if we spawned a thread it is responsible for eventually - // flushing and closeing the writer - if (!threadWillClosePWriter) { - pwriter.flush(); - pwriter.close(); + if (len == ruokCmd) { + RuokCommand ruok = new RuokCommand(pwriter); + ruok.start(); + return true; + } else if (len == getTraceMaskCmd) { + TraceMaskCommand tmask = new TraceMaskCommand(pwriter); + tmask.start(); + return true; + } else if (len == setTraceMaskCmd) { + int rc = sock.read(incomingBuffer); + if (rc < 0) { + throw new IOException("Read error"); } + + incomingBuffer.flip(); + long traceMask = incomingBuffer.getLong(); + ZooTrace.setTextTraceLevel(traceMask); + SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask); + setMask.start(); + return true; + } else if (len == enviCmd) { + EnvCommand env = new EnvCommand(pwriter); + env.start(); + return true; + } else if (len == confCmd) { + ConfCommand ccmd = new ConfCommand(pwriter); + ccmd.start(); + return true; + } else if (len == srstCmd) { + StatResetCommand strst = new StatResetCommand(pwriter); + strst.start(); + return true; + } else if (len == crstCmd) { + CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter); + crst.start(); + return true; + } else if (len == dumpCmd) { + DumpCommand dump = new DumpCommand(pwriter); + dump.start(); + return true; + } else if (len == statCmd || len == srvrCmd) { + StatCommand stat = new StatCommand(pwriter, len); + stat.start(); + return true; + } else if (len == consCmd) { + ConsCommand cons = new ConsCommand(pwriter); + cons.start(); + return true; + } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) { + WatchCommand wcmd = new WatchCommand(pwriter, len); + wcmd.start(); + return true; } return false; } @@ -1618,7 +1766,8 @@ public class NIOServerCnxn implements Wa pwriter.print(((SocketChannel)channel).socket() .getRemoteSocketAddress()); pwriter.print("["); - pwriter.print(Integer.toHexString(sk.interestOps())); + pwriter.print(sk.isValid() ? Integer.toHexString(sk.interestOps()) + : "0"); pwriter.print("](queued="); pwriter.print(getOutstandingRequests()); pwriter.print(",recved="); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=941056&r1=941055&r2=941056&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue May 4 21:45:13 2010 @@ -226,6 +226,8 @@ public abstract class ClientBase extends OutputStream outstream = sock.getOutputStream(); outstream.write(cmd.getBytes()); outstream.flush(); + // this replicates NC - close the output stream before reading + sock.shutdownOutput(); reader = new BufferedReader(