Author: breed
Date: Tue Jun  9 05:04:36 2009
New Revision: 782878

URL: http://svn.apache.org/viewvc?rev=782878&view=rev
Log:
ZOOKEEPER-336. single bad client can cause server to stop accepting connections

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/c/Makefile.am
    hadoop/zookeeper/trunk/src/c/tests/zkServer.sh
    
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Jun  9 05:04:36 2009
@@ -194,6 +194,8 @@
   ZOOKEEPER-196. doxygen comment for state argument of watcher_fn typedef and
 implementation differ ("...one of the *_STATE constants, otherwise -1") (breed
 via mahadev)
+
+  ZOOKEEPER-336. single bad client can cause server to stop accepting 
connections (henry robinson via breed)
  
 NEW FEATURES:
 

Modified: hadoop/zookeeper/trunk/src/c/Makefile.am
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/Makefile.am?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/Makefile.am (original)
+++ hadoop/zookeeper/trunk/src/c/Makefile.am Tue Jun  9 05:04:36 2009
@@ -70,9 +70,11 @@
 
 TEST_SOURCES = tests/TestDriver.cc tests/LibCMocks.cc tests/LibCSymTable.cc \
     tests/MocksBase.cc  tests/ZKMocks.cc tests/Util.cc tests/ThreadingUtil.cc \
-    tests/TestWatchers.cc \
+    tests/TestClientRetry.cc \
     tests/TestOperations.cc tests/TestZookeeperInit.cc \
-    tests/TestZookeeperClose.cc tests/TestClient.cc
+    tests/TestZookeeperClose.cc tests/TestClient.cc \
+    tests/TestWatchers.cc
+
 
 SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt)
 

Modified: hadoop/zookeeper/trunk/src/c/tests/zkServer.sh
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/zkServer.sh?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/zkServer.sh (original)
+++ hadoop/zookeeper/trunk/src/c/tests/zkServer.sh Tue Jun  9 05:04:36 2009
@@ -53,12 +53,12 @@
        if [ "x${base_dir}" == "x" ]
         then
        mkdir -p /tmp/zkdata
-        java -cp 
../../zookeeper-dev.jar:../../src/java/lib/log4j-1.2.15.jar:../../conf 
org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata &> 
/tmp/zk.log &
+        java -cp ../../zookeeper-dev.jar:../../src/java/lib/log4j-1.2.15.jar 
org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata 3000 
$ZKMAXCNXNS &> /tmp/zk.log &
         pid=$!
         echo $! > /tmp/zk.pid        
         else
         mkdir -p ${base_dir}/build/tmp/zkdata
-        java -cp 
${base_dir}/zookeeper-dev.jar:${base_dir}/src/java/lib/log4j-1.2.15.jar:${base_dir}/conf
 org.apache.zookeeper.server.ZooKeeperServerMain 22181 
${base_dir}/build/tmp/zkdata &> ${base_dir}/build/tmp/zk.log &
+        java -cp 
${base_dir}/zookeeper-dev.jar:${base_dir}/src/java/lib/log4j-1.2.15.jar:${base_dir}/conf
 org.apache.zookeeper.server.ZooKeeperServerMain 22181 
${base_dir}/build/tmp/zkdata 3000 $ZKMAXCNXNS &> ${base_dir}/build/tmp/zk.log &
         echo $! > ${base_dir}/build/tmp/zk.pid
        fi
         sleep 5

Modified: 
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
 (original)
+++ 
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
 Tue Jun  9 05:04:36 2009
@@ -676,6 +676,19 @@
               property. Yes - it's not consistent, and it's annoying.)</para>
             </listitem>
           </varlistentry>
+          <varlistentry>
+            <term>maxClientCnxns</term>
+            <listitem>
+              <para>(No Java system property)</para>
+
+              <para>Limits the number of concurrent connections (at the socket 
+              level) that a single client, identified by IP address, may make
+              to a single member of the ZooKeeper ensemble. This is used to 
+              prevent certain classes of DoS attacks, including file 
+              descriptor exhaustion. Setting this to 0 or omitting it entirely 
+              removes the limit on concurrent connections.</para>
+            </listitem>
+           </varlistentry>
         </variablelist>
       </section>
 
@@ -790,7 +803,7 @@
             </listitem>
           </varlistentry>
 
-       <varlistentry>
+          <varlistentry>
             <term>weight.x=nnnnn</term>
 
             <listitem>

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java 
Tue Jun  9 05:04:36 2009
@@ -89,6 +89,7 @@
     private class MyWatcher implements Watcher {
         public void process(WatchedEvent event) {
             if (getPrintWatches()) {
+                ZooKeeperMain.printMessage("WATCHER::");
                 ZooKeeperMain.printMessage(event.toString());
             }
         }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 Tue Jun  9 05:04:36 2009
@@ -32,6 +32,7 @@
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -84,21 +85,42 @@
         ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
 
         HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
+        HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new 
HashMap<InetAddress, Set<NIOServerCnxn>>( ); 
 
         int outstandingLimit = 1;
 
-        /** Create the factory, startup(zks) must be called subsequently.
-         * @param port listener port
+        int maxClientCnxns = 10;  
+        
+        
+        /**
+         * Construct a new server connection factory which will accept an 
unlimited number
+         * of concurrent connections from each client (up to the file 
descriptor
+         * limits of the operating system). startup(zks) must be called 
subsequently.
+         * @param port
          * @throws IOException
          */
         public Factory(int port) throws IOException {
+            this(port,0);
+        }
+        
+        
+        /**
+         * Constructs a new server connection factory where the number of 
concurrent connections
+         * from a single IP address is limited to maxcc (or unlimited if 0).
+         * startup(zks) must be called subsequently.
+         * @param port - the port to listen on for connections. 
+         * @param maxcc - the number of concurrent connections allowed from a 
single client.
+         * @throws IOException
+         */
+        public Factory(int port, int maxcc) throws IOException {
             super("NIOServerCxn.Factory:" + port);
             setDaemon(true);
+            maxClientCnxns = maxcc;
             this.ss = ServerSocketChannel.open();
             ss.socket().setReuseAddress(true);
             ss.socket().bind(new InetSocketAddress(port));
             ss.configureBlocking(false);
-            ss.register(selector, SelectionKey.OP_ACCEPT);
+            ss.register(selector, SelectionKey.OP_ACCEPT);         
         }
 
         @Override
@@ -109,9 +131,8 @@
             }
         }
 
-        public void startup(ZooKeeperServer zks)
-            throws IOException, InterruptedException
-        {
+        public void startup(ZooKeeperServer zks) throws IOException,
+                InterruptedException {
             start();
             zks.startup();
             setZooKeeperServer(zks);
@@ -142,6 +163,15 @@
         private void addCnxn(NIOServerCnxn cnxn) {
             synchronized (cnxns) {
                 cnxns.add(cnxn);
+                synchronized (ipMap){
+                    InetAddress addr = cnxn.sock.socket().getInetAddress();
+                    Set<NIOServerCnxn> s = ipMap.get(addr);
+                    if (s == null) {
+                        s = new HashSet<NIOServerCnxn>();
+                    }
+                    s.add(cnxn);
+                    ipMap.put(addr,s);
+                }                
             }
         }
 
@@ -150,7 +180,13 @@
             return new NIOServerCnxn(zks, sock, sk, this);
         }
 
-        public void run() {
+        private int getClientCnxnCount( InetAddress cl) {
+            Set<NIOServerCnxn> s = ipMap.get(cl);
+            if (s == null) return 0;
+            return s.size();
+        }
+        
+        public void run() {         
             while (!ss.socket().isClosed()) {
                 try {
                     selector.select(1000);
@@ -164,13 +200,21 @@
                     for (SelectionKey k : selectedList) {
                         if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                             SocketChannel sc = ((ServerSocketChannel) k
-                                    .channel()).accept();
-                            sc.configureBlocking(false);
-                            SelectionKey sk = sc.register(selector,
-                                    SelectionKey.OP_READ);
-                            NIOServerCnxn cnxn = createConnection(sc, sk);
-                            sk.attach(cnxn);
-                            addCnxn(cnxn);
+                                    .channel()).accept();     
+                            InetAddress ia = sc.socket().getInetAddress();
+                            int cnxncount = getClientCnxnCount(ia); 
+                            if (maxClientCnxns > 0 && cnxncount >= 
maxClientCnxns){                                  
+                                LOG.warn("Too many connections from " + ia 
+                                         + " - max is " + maxClientCnxns );
+                                sc.close();                 
+                            } else {
+                                sc.configureBlocking(false);
+                                SelectionKey sk = sc.register(selector,
+                                        SelectionKey.OP_READ);
+                                NIOServerCnxn cnxn = createConnection(sc, sk);
+                                sk.attach(cnxn);
+                                addCnxn(cnxn);
+                            }                            
                         } else if ((k.readyOps() & (SelectionKey.OP_READ | 
SelectionKey.OP_WRITE)) != 0) {
                             NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                             c.doIO(k);
@@ -762,11 +806,16 @@
             LOG.warn("Failed to unregister with JMX", e);
         }
         jmxConnectionBean = null;
-
+        
         if (closed) {
             return;
         }
         closed = true;
+        synchronized (factory.ipMap)
+        {
+            Set<NIOServerCnxn> s = 
factory.ipMap.get(sock.socket().getInetAddress());
+            s.remove(this);           
+        }
         synchronized (factory.cnxns) {
             factory.cnxns.remove(this);
         }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java
 Tue Jun  9 05:04:36 2009
@@ -33,7 +33,8 @@
     protected int clientPort;
     protected String dataDir;
     protected String dataLogDir;
-    protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
+    protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;    
+    protected int maxClientCnxns;
 
     /**
      * Parse arguments for server configuration
@@ -42,7 +43,7 @@
      * @throws IllegalArgumentException on invalid usage
      */
     public void parse(String[] args) {
-        if (args.length < 2 || args.length > 3) {
+        if (args.length < 2 || args.length > 4) {
             throw new IllegalArgumentException("Invalid args:"
                     + Arrays.toString(args));
         }
@@ -53,6 +54,9 @@
         if (args.length == 3) {
             tickTime = Integer.parseInt(args[2]);
         }
+        if (args.length == 4) {
+            maxClientCnxns = Integer.parseInt(args[3]);
+        }
     }
 
     /**
@@ -79,10 +83,12 @@
       dataDir = config.getDataDir();
       dataLogDir = config.getDataLogDir();
       tickTime = config.getTickTime();
+      maxClientCnxns = config.getMaxClientCnxns();
     }
 
     public int getClientPort() { return clientPort; }
     public String getDataDir() { return dataDir; }
     public String getDataLogDir() { return dataLogDir; }
     public int getTickTime() { return tickTime; }
+    public int getMaxClientCnxns() { return maxClientCnxns; }
 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
 Tue Jun  9 05:04:36 2009
@@ -36,7 +36,7 @@
         Logger.getLogger(ZooKeeperServerMain.class);
 
     private static final String USAGE =
-        "Usage: ZooKeeperServerMain configfile | port datadir [ticktime]";
+        "Usage: ZooKeeperServerMain configfile | port datadir [ticktime] 
[maxcnxns]";
 
     private NIOServerCnxn.Factory cnxnFactory;
 
@@ -103,7 +103,7 @@
                    File(config.dataLogDir), new File(config.dataDir));
             zkServer.setTxnLogFactory(ftxn);
             zkServer.setTickTime(config.tickTime);
-            cnxnFactory = new NIOServerCnxn.Factory(config.clientPort);
+            cnxnFactory = new NIOServerCnxn.Factory(config.clientPort, 
config.getMaxClientCnxns());
             cnxnFactory.startup(zkServer);
             cnxnFactory.join();
             if (zkServer.isRunning()) {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
 Tue Jun  9 05:04:36 2009
@@ -49,6 +49,7 @@
     protected int syncLimit;
     protected int electionAlg;
     protected int electionPort;
+    protected int maxClientCnxns = 10;
     protected final HashMap<Long,QuorumServer> servers =
         new HashMap<Long, QuorumServer>();
 
@@ -125,6 +126,8 @@
                 syncLimit = Integer.parseInt(value);
             } else if (key.equals("electionAlg")) {
                 electionAlg = Integer.parseInt(value);
+            } else if (key.equals("maxClientCnxns")) {
+                maxClientCnxns = Integer.parseInt(value);
             } else if (key.startsWith("server.")) {
                 int dot = key.indexOf('.');
                 long sid = Long.parseLong(key.substring(dot + 1));
@@ -259,7 +262,9 @@
     public int getInitLimit() { return initLimit; }
     public int getSyncLimit() { return syncLimit; }
     public int getElectionAlg() { return electionAlg; }
-    public int getElectionPort() { return electionPort; }
+    public int getElectionPort() { return electionPort; }    
+    public int getMaxClientCnxns() { return maxClientCnxns; }
+    
     public QuorumVerifier getQuorumVerifier() {   
         return quorumVerifier;
     }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
 Tue Jun  9 05:04:36 2009
@@ -118,7 +118,7 @@
       LOG.info("Starting quorum peer");
       try {
           NIOServerCnxn.Factory cnxnFactory =
-              new NIOServerCnxn.Factory(config.getClientPort());
+              new NIOServerCnxn.Factory(config.getClientPort(), 
config.getMaxClientCnxns());
   
           quorumPeer = new QuorumPeer();
           quorumPeer.setClientPort(config.getClientPort());

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=782878&r1=782877&r2=782878&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java 
Tue Jun  9 05:04:36 2009
@@ -52,6 +52,7 @@
         new File(System.getProperty("build.test.dir", "build"));
 
     protected String hostPort = "127.0.0.1:33221";
+    protected int maxCnxns = 0;
     protected NIOServerCnxn.Factory serverFactory = null;
     protected File tmpDir = null;
     public ClientBase() {
@@ -253,15 +254,15 @@
         
         return tmpDir;
     }
-    
+        
     static NIOServerCnxn.Factory createNewServerInstance(File dataDir,
-            NIOServerCnxn.Factory factory, String hostPort)
+            NIOServerCnxn.Factory factory, String hostPort, int maxCnxns)
         throws IOException, InterruptedException 
     {
         ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
         final int PORT = Integer.parseInt(hostPort.split(":")[1]);
         if (factory == null) {
-            factory = new NIOServerCnxn.Factory(PORT);
+            factory = new NIOServerCnxn.Factory(PORT,maxCnxns);
         }
         factory.startup(zks);
 
@@ -314,7 +315,7 @@
 
     protected void startServer() throws Exception {
         LOG.info("STARTING server");
-        serverFactory = createNewServerInstance(tmpDir, serverFactory, 
hostPort);
+        serverFactory = createNewServerInstance(tmpDir, serverFactory, 
hostPort, maxCnxns);
         // ensure that only server and data bean are registered
         JMXEnv.ensureOnly("InMemoryDataTree", "StandaloneServer_port");
     }
@@ -326,7 +327,7 @@
         // ensure no beans are leftover
         JMXEnv.ensureOnly();
     }
-    
+        
     @Override
     protected void tearDown() throws Exception {
         LOG.info("tearDown starting");


Reply via email to