Author: mahadev
Date: Tue May  4 21:45:13 2010
New Revision: 941056

URL: http://svn.apache.org/viewvc?rev=941056&view=rev
Log:
ZOOKEEPER-737. some 4 letter words may fail with netcat (nc). (mahadev)

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=941056&r1=941055&r2=941056&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue May  4 21:45:13 2010
@@ -44,6 +44,8 @@ BUGFIXES: 
   ZOOKEEPER-758. zkpython segfaults on invalid acl with missing key
   (Kapil Thangavelu via henryr)
 
+  ZOOKEEPER-737. some 4 letter words may fail with netcat (nc). (mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=941056&r1=941055&r2=941056&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 Tue May  4 21:45:13 2010
@@ -39,7 +39,6 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -386,6 +385,29 @@ public class NIOServerCnxn implements Wa
         sendBuffer(closeConn);
     }
 
+    /**
+     * send buffer without using the asynchronous
+     * calls to selector and then close the socket
+     * @param bb
+     */
+    void sendBufferSync(ByteBuffer bb) {
+       try {
+           /* configure socket to be blocking
+            * so that we dont have to do write in 
+            * a tight while loop
+            */
+           sock.configureBlocking(true);
+           if (bb != closeConn) {
+               if (sock != null) {
+                   sock.write(bb);
+               }
+               packetSent();
+           } 
+       } catch (IOException ie) {
+           LOG.error("Error sending data synchronously ", ie);
+       }
+    }
+    
     void sendBuffer(ByteBuffer bb) {
         try {
             if (bb != closeConn) {
@@ -497,6 +519,11 @@ public class NIOServerCnxn implements Wa
                     if (isPayload) { // not the case for 4letterword
                         readPayload();
                     }
+                    else {
+                        // four letter words take care
+                        // need not do anything else
+                        return;
+                    }
                 }
             }
             if (k.isWritable()) {
@@ -886,6 +913,30 @@ public class NIOServerCnxn implements Wa
     }
 
     /**
+     * clean up the socket related to a command and also make sure we flush the
+     * data before we do that
+     * 
+     * @param pwriter
+     *            the pwriter for a command socket
+     */
+    private void cleanupWriterSocket(PrintWriter pwriter) {
+        try {
+            if (pwriter != null) {
+                pwriter.flush();
+                pwriter.close();
+            }
+        } catch (Exception e) {
+            LOG.info("Error closing PrintWriter ", e);
+        } finally {
+            try {
+                close();
+            } catch (Exception e) {
+                LOG.error("Error closing a command socket ", e);
+            }
+        }
+    }
+    
+    /**
      * This class wraps the sendBuffer method of NIOServerCnxn. It is
      * responsible for chunking up the response to a client. Rather
      * than cons'ing up a response fully in memory, which may be large
@@ -893,50 +944,23 @@ public class NIOServerCnxn implements Wa
      */
     private class SendBufferWriter extends Writer {
         private StringBuffer sb = new StringBuffer();
-
-        /* FYI: clearing the READ interestOps on the key results in
-         * the cnxn being closed in doIO.
-         */
-
+        
         /**
          * Check if we are ready to send another chunk.
          * @param force force sending, even if not a full chunk
          */
         private void checkFlush(boolean force) {
             if ((force && sb.length() > 0) || sb.length() > 2048) {
-                sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
-                // including op_read keeps doio from closing the conn
-                wakeup(SelectionKey.OP_READ
-                    | SelectionKey.OP_WRITE);
-
+                sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes()));
                 // clear our internal buffer
                 sb.setLength(0);
             }
         }
 
-        /**
-         * Wakeup the selector. This is necessary as the cnxn is
-         * waiting for interestOps to be satisfied. If we want the
-         * selector to wakeup immediately (rather than the last
-         * select(timeout) period) we need to force a wakeup.
-         * @param sel the new interest ops
-         */
-        private void wakeup(int sel) {
-            synchronized(factory) {
-                sk.selector().wakeup();
-                sk.interestOps(sel);
-            }
-        }
-
         @Override
         public void close() throws IOException {
             if (sb == null) return;
-
             checkFlush(true);
-
-            // nothing left, please close
-            wakeup(SelectionKey.OP_WRITE);
-
             sb = null; // clear out the ref to ensure no reuse
         }
 
@@ -954,10 +978,254 @@ public class NIOServerCnxn implements Wa
 
     private static final String ZK_NOT_SERVING =
         "This ZooKeeper instance is not currently serving requests";
+    
+    /**
+     * Set of threads for commmand ports. All the 4
+     * letter commands are run via a thread. Each class
+     * maps to a correspoding 4 letter command. CommandThread
+     * is the abstract class from which all the others inherit.
+     */
+    private abstract class CommandThread extends Thread {
+        PrintWriter pw;
+        
+        CommandThread(PrintWriter pw) {
+            this.pw = pw;
+        }
+        
+        public void run() {
+            try {
+                commandRun();
+            } catch (IOException ie) {
+                LOG.error("Error in running command ", ie);
+            } finally {
+                cleanupWriterSocket(pw);
+            }
+        }
+        
+        public abstract void commandRun() throws IOException;
+    }
+    
+    private class RuokCommand extends CommandThread {
+        public RuokCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            pw.print("imok");
+            
+        }
+    }
+    
+    private class TraceMaskCommand extends CommandThread {
+        TraceMaskCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            long traceMask = ZooTrace.getTextTraceLevel();
+            pw.print(traceMask);
+        }
+    }
+    
+    private class SetTraceMaskCommand extends CommandThread {
+        long trace = 0;
+        SetTraceMaskCommand(PrintWriter pw, long trace) {
+            super(pw);
+            this.trace = trace;
+        }
+        
+        @Override
+        public void commandRun() {
+            pw.print(trace);
+        }
+    }
+    
+    private class EnvCommand extends CommandThread {
+        EnvCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            List<Environment.Entry> env = Environment.list();
+
+            pw.println("Environment:");
+            for(Environment.Entry e : env) {
+                pw.print(e.getKey());
+                pw.print("=");
+                pw.println(e.getValue());
+            }
+            
+        } 
+    }
+    
+    private class ConfCommand extends CommandThread {
+        ConfCommand(PrintWriter pw) {
+            super(pw);
+        }
+            
+        @Override
+        public void commandRun() {
+            if (zk == null) {
+                pw.println(ZK_NOT_SERVING);
+            } else {
+                zk.dumpConf(pw);
+            }
+        }
+    }
+    
+    private class StatResetCommand extends CommandThread {
+        public StatResetCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            if (zk == null) {
+                pw.println(ZK_NOT_SERVING);
+            }
+            else { 
+                zk.serverStats().reset();
+                pw.println("Server stats reset.");
+            }
+        }
+    }
+    
+    private class CnxnStatResetCommand extends CommandThread {
+        public CnxnStatResetCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            if (zk == null) {
+                pw.println(ZK_NOT_SERVING);
+            } else {
+                synchronized(factory.cnxns){
+                    for(NIOServerCnxn c : factory.cnxns){
+                        c.getStats().reset();
+                    }
+                }
+                pw.println("Connection stats reset.");
+            }
+        }
+    }
+
+    private class DumpCommand extends CommandThread {
+        public DumpCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            if (zk == null) {
+                pw.println(ZK_NOT_SERVING);
+            }
+            else {
+                pw.println("SessionTracker dump:");
+                zk.sessionTracker.dumpSessions(pw);
+                pw.println("ephemeral nodes dump:");
+                zk.dumpEphemerals(pw);
+            }
+        }
+    }
+    
+    private class StatCommand extends CommandThread {
+        int len;
+        public StatCommand(PrintWriter pw, int len) {
+            super(pw);
+            this.len = len;
+        }
+        
+        @SuppressWarnings("unchecked")
+        @Override
+        public void commandRun() {
+            if (zk == null) {
+                pw.println(ZK_NOT_SERVING);
+            }
+            else {   
+                pw.print("Zookeeper version: ");
+                pw.println(Version.getFullVersion());
+                if (len == statCmd) {
+                    LOG.info("Stat command output");
+                    pw.println("Clients:");
+                    // clone should be faster than iteration
+                    // ie give up the cnxns lock faster
+                    HashSet<NIOServerCnxn> cnxnset;
+                    synchronized(factory.cnxns){
+                        cnxnset = (HashSet<NIOServerCnxn>)factory
+                        .cnxns.clone();
+                    }
+                    for(NIOServerCnxn c : cnxnset){
+                        ((CnxnStats)c.getStats())
+                        .dumpConnectionInfo(pw, true);
+                    }
+                    pw.println();
+                }
+                pw.print(zk.serverStats().toString());
+                pw.print("Node count: ");
+                pw.println(zk.getZKDatabase().getNodeCount());
+            }
+            
+        }
+    }
+    
+    private class ConsCommand extends CommandThread {
+        public ConsCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @SuppressWarnings("unchecked")
+        @Override
+        public void commandRun() {
+            if (zk == null) {
+                pw.println(ZK_NOT_SERVING);
+            } else {
+                // clone should be faster than iteration
+                // ie give up the cnxns lock faster
+                HashSet<NIOServerCnxn> cnxns;
+                synchronized (factory.cnxns) {
+                    cnxns = (HashSet<NIOServerCnxn>) factory.cnxns.clone();
+                }
+                for (NIOServerCnxn c : cnxns) {
+                    ((CnxnStats) c.getStats()).dumpConnectionInfo(pw, false);
+                }
+                pw.println();
+            }
+        }
+    }
+    
+    private class WatchCommand extends CommandThread {
+        int len = 0;
+        public WatchCommand(PrintWriter pw, int len) {
+            super(pw);
+            this.len = len;
+        }
 
+        @Override
+        public void commandRun() {
+            if (zk == null) {
+                pw.println(ZK_NOT_SERVING);
+            } else {
+                DataTree dt = zk.getZKDatabase().getDataTree();
+                if (len == wchsCmd) {
+                    dt.dumpWatchesSummary(pw);
+                } else if (len == wchpCmd) {
+                    dt.dumpWatches(pw, true);
+                } else {
+                    dt.dumpWatches(pw, false);
+                }
+                pw.println();
+            }
+        }
+    }
+    
+    
     /** Return if four letter word found and responded to, otw false **/
     private boolean checkFourLetterWord(final SelectionKey k, final int len)
-        throws IOException
+    throws IOException
     {
         // We take advantage of the limited size of the length to look
         // for cmds. They are all 4-bytes which fits inside of an int
@@ -969,197 +1237,77 @@ public class NIOServerCnxn implements Wa
                 + sock.socket().getRemoteSocketAddress());
         packetReceived();
 
+        /** cancel the selection key to remove the socket handling
+         * from selector. This is to prevent netcat problem wherein
+         * netcat immediately closes the sending side after sending the
+         * commands and still keeps the receiving channel open. 
+         * The idea is to remove the selectionkey from the selector
+         * so that the selector does not notice the closed read on the
+         * socket channel and keep the socket alive to write the data to
+         * and makes sure to close the socket after its done writing the data
+         */
+        if (k != null) {
+            try {
+                k.cancel();
+            } catch(Exception e) {
+                LOG.error("Error cancelling command selection key ", e);
+            }
+        }
+
         final PrintWriter pwriter = new PrintWriter(
                 new BufferedWriter(new SendBufferWriter()));
-        boolean threadWillClosePWriter = false;
-        try {
-            if (len == ruokCmd) {
-                pwriter.print("imok");
-                return true;
-            } else if (len == getTraceMaskCmd) {
-                long traceMask = ZooTrace.getTextTraceLevel();
-                pwriter.print(traceMask);
-                return true;
-            } else if (len == setTraceMaskCmd) {
-                int rc = sock.read(incomingBuffer);
-                if (rc < 0) {
-                    throw new IOException("Read error");
-                }
-
-                incomingBuffer.flip();
-                long traceMask = incomingBuffer.getLong();
-                ZooTrace.setTextTraceLevel(traceMask);
-                pwriter.print(traceMask);
-                return true;
-            } else if (len == enviCmd) {
-                List<Environment.Entry> env = Environment.list();
-
-                pwriter.println("Environment:");
-                for(Environment.Entry e : env) {
-                    pwriter.print(e.getKey());
-                    pwriter.print("=");
-                    pwriter.println(e.getValue());
-                }
-                return true;
-            } else if (len == confCmd) {
-                if (zk == null) {
-                    pwriter.println(ZK_NOT_SERVING);
-                    return true;
-                }
-                zk.dumpConf(pwriter);
-                return true;
-            } else if (len == srstCmd) {
-                if (zk == null) {
-                    pwriter.println(ZK_NOT_SERVING);
-                    return true;
-                }
-                zk.serverStats().reset();
-                pwriter.println("Server stats reset.");
-                return true;
-            } else if (len == crstCmd) {
-                if (zk == null) {
-                    pwriter.println(ZK_NOT_SERVING);
-                    return true;
-                }
-                synchronized(factory.cnxns){
-                    for(NIOServerCnxn c : factory.cnxns){
-                        c.getStats().reset();
-                    }
-                }
-                pwriter.println("Connection stats reset.");
-                return true;
-            } else if (len == dumpCmd) {
-                if (zk == null) {
-                    pwriter.println(ZK_NOT_SERVING);
-                    return true;
-                }
-                // this could be a long running task, spawn a thread so
-                // that we don't block the processing of other requests
-                threadWillClosePWriter = true;
-                new Thread() {
-                    @Override
-                    public void run() {
-                        try {
-                            pwriter.println("SessionTracker dump:");
-                            zk.sessionTracker.dumpSessions(pwriter);
-                            pwriter.println("ephemeral nodes dump:");
-                            zk.dumpEphemerals(pwriter);
-                        } finally {
-                            pwriter.flush();
-                            pwriter.close();
-                        }
-                    }
-                }.start();
-
-                return true;
-            } else if (len == statCmd || len == srvrCmd) {
-                if (zk == null) {
-                    pwriter.println(ZK_NOT_SERVING);
-                    return true;
-                }
-                // this could be a long running task, spawn a thread so
-                // that we don't block the processing of other requests
-                threadWillClosePWriter = true;
-                new Thread() {
-                    @SuppressWarnings("unchecked")
-                    @Override
-                    public void run() {
-                        try {
-                            pwriter.print("Zookeeper version: ");
-                            pwriter.println(Version.getFullVersion());
-                            if (len == statCmd) {
-                                pwriter.println("Clients:");
-                                // clone should be faster than iteration
-                                // ie give up the cnxns lock faster
-                                HashSet<NIOServerCnxn> cnxns;
-                                synchronized(factory.cnxns){
-                                    cnxns = (HashSet<NIOServerCnxn>)factory
-                                        .cnxns.clone();
-                                }
-                                for(NIOServerCnxn c : cnxns){
-                                    ((CnxnStats)c.getStats())
-                                        .dumpConnectionInfo(pwriter, true);
-                                }
-                                pwriter.println();
-                            }
-                            pwriter.print(zk.serverStats().toString());
-                            pwriter.print("Node count: ");
-                            pwriter.println(zk.getZKDatabase().getNodeCount());
-                        } finally {
-                            pwriter.flush();
-                            pwriter.close();
-                        }
-                    }
-                }.start();
-                return true;
-            } else if (len == consCmd) {
-                if (zk == null) {
-                    pwriter.println(ZK_NOT_SERVING);
-                    return true;
-                }
-                // this could be a long running task, spawn a thread so
-                // that we don't block the processing of other requests
-                threadWillClosePWriter = true;
-                new Thread() {
-                    @SuppressWarnings("unchecked")
-                    @Override
-                    public void run() {
-                        try {
-                            // clone should be faster than iteration
-                            // ie give up the cnxns lock faster
-                            HashSet<NIOServerCnxn> cnxns;
-                            synchronized(factory.cnxns){
-                                cnxns = (HashSet<NIOServerCnxn>)factory
-                                    .cnxns.clone();
-                            }
-                            for(NIOServerCnxn c : cnxns){
-                                ((CnxnStats)c.getStats())
-                                    .dumpConnectionInfo(pwriter, false);
-                            }
-                            pwriter.println();
-                        } finally {
-                            pwriter.flush();
-                            pwriter.close();
-                        }
-                    }
-                }.start();
-                return true;
-            } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
-                if (zk == null) {
-                    pwriter.println(ZK_NOT_SERVING);
-                    return true;
-                }
-                // this could be a long running task, spawn a thread so
-                // that we don't block the processing of other requests
-                threadWillClosePWriter = true;
-                new Thread() {
-                    @Override
-                    public void run() {
-                        try {
-                            DataTree dt = zk.getZKDatabase().getDataTree();
-                            if (len == wchsCmd) {
-                                dt.dumpWatchesSummary(pwriter);
-                            } else if (len == wchpCmd) {
-                                dt.dumpWatches(pwriter, true);
-                            } else {
-                                dt.dumpWatches(pwriter, false);
-                            }
-                            pwriter.println();
-                        } finally {
-                            pwriter.flush();
-                            pwriter.close();
-                        }
-                    }
-                }.start();
-                return true;
-            }
-        } finally {
-            // if we spawned a thread it is responsible for eventually
-            // flushing and closeing the writer
-            if (!threadWillClosePWriter) {
-                pwriter.flush();
-                pwriter.close();
+        if (len == ruokCmd) {
+            RuokCommand ruok = new RuokCommand(pwriter);
+            ruok.start();
+            return true;
+        } else if (len == getTraceMaskCmd) {
+            TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
+            tmask.start();
+            return true;
+        } else if (len == setTraceMaskCmd) {
+            int rc = sock.read(incomingBuffer);
+            if (rc < 0) {
+                throw new IOException("Read error");
             }
+
+            incomingBuffer.flip();
+            long traceMask = incomingBuffer.getLong();
+            ZooTrace.setTextTraceLevel(traceMask);
+            SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, 
traceMask);
+            setMask.start();
+            return true;
+        } else if (len == enviCmd) {
+            EnvCommand env = new EnvCommand(pwriter);
+            env.start();
+            return true;
+        } else if (len == confCmd) {
+            ConfCommand ccmd = new ConfCommand(pwriter);
+            ccmd.start();
+            return true;
+        } else if (len == srstCmd) {
+            StatResetCommand strst = new StatResetCommand(pwriter);
+            strst.start();
+            return true;
+        } else if (len == crstCmd) {
+            CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
+            crst.start();
+            return true;
+        } else if (len == dumpCmd) {
+            DumpCommand dump = new DumpCommand(pwriter);
+            dump.start();
+            return true;
+        } else if (len == statCmd || len == srvrCmd) {
+            StatCommand stat = new StatCommand(pwriter, len);
+            stat.start();
+            return true;
+        } else if (len == consCmd) {
+            ConsCommand cons = new ConsCommand(pwriter);
+            cons.start();
+            return true;
+        } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
+            WatchCommand wcmd = new WatchCommand(pwriter, len);
+            wcmd.start();
+            return true;
         }
         return false;
     }
@@ -1618,7 +1766,8 @@ public class NIOServerCnxn implements Wa
                 pwriter.print(((SocketChannel)channel).socket()
                         .getRemoteSocketAddress());
                 pwriter.print("[");
-                pwriter.print(Integer.toHexString(sk.interestOps()));
+                pwriter.print(sk.isValid() ? 
Integer.toHexString(sk.interestOps())
+                        : "0");
                 pwriter.print("](queued=");
                 pwriter.print(getOutstandingRequests());
                 pwriter.print(",recved=");

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=941056&r1=941055&r2=941056&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java 
Tue May  4 21:45:13 2010
@@ -226,6 +226,8 @@ public abstract class ClientBase extends
             OutputStream outstream = sock.getOutputStream();
             outstream.write(cmd.getBytes());
             outstream.flush();
+            // this replicates NC - close the output stream before reading
+            sock.shutdownOutput();
 
             reader =
                 new BufferedReader(


Reply via email to