Author: breed Date: Tue Jun 9 05:04:36 2009 New Revision: 782878 URL: http://svn.apache.org/viewvc?rev=782878&view=rev Log: ZOOKEEPER-336. single bad client can cause server to stop accepting connections
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/c/Makefile.am hadoop/zookeeper/trunk/src/c/tests/zkServer.sh hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Tue Jun 9 05:04:36 2009 @@ -194,6 +194,8 @@ ZOOKEEPER-196. doxygen comment for state argument of watcher_fn typedef and implementation differ ("...one of the *_STATE constants, otherwise -1") (breed via mahadev) + + ZOOKEEPER-336. single bad client can cause server to stop accepting connections (henry robinson via breed) NEW FEATURES: Modified: hadoop/zookeeper/trunk/src/c/Makefile.am URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/Makefile.am?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/Makefile.am (original) +++ hadoop/zookeeper/trunk/src/c/Makefile.am Tue Jun 9 05:04:36 2009 @@ -70,9 +70,11 @@ TEST_SOURCES = tests/TestDriver.cc tests/LibCMocks.cc tests/LibCSymTable.cc \ tests/MocksBase.cc tests/ZKMocks.cc tests/Util.cc tests/ThreadingUtil.cc \ - tests/TestWatchers.cc \ + tests/TestClientRetry.cc \ tests/TestOperations.cc tests/TestZookeeperInit.cc \ - tests/TestZookeeperClose.cc tests/TestClient.cc + tests/TestZookeeperClose.cc tests/TestClient.cc \ + tests/TestWatchers.cc + SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt) Modified: hadoop/zookeeper/trunk/src/c/tests/zkServer.sh URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/zkServer.sh?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/zkServer.sh (original) +++ hadoop/zookeeper/trunk/src/c/tests/zkServer.sh Tue Jun 9 05:04:36 2009 @@ -53,12 +53,12 @@ if [ "x${base_dir}" == "x" ] then mkdir -p /tmp/zkdata - java -cp ../../zookeeper-dev.jar:../../src/java/lib/log4j-1.2.15.jar:../../conf org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata &> /tmp/zk.log & + java -cp ../../zookeeper-dev.jar:../../src/java/lib/log4j-1.2.15.jar org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata 3000 $ZKMAXCNXNS &> /tmp/zk.log & pid=$! echo $! > /tmp/zk.pid else mkdir -p ${base_dir}/build/tmp/zkdata - java -cp ${base_dir}/zookeeper-dev.jar:${base_dir}/src/java/lib/log4j-1.2.15.jar:${base_dir}/conf org.apache.zookeeper.server.ZooKeeperServerMain 22181 ${base_dir}/build/tmp/zkdata &> ${base_dir}/build/tmp/zk.log & + java -cp ${base_dir}/zookeeper-dev.jar:${base_dir}/src/java/lib/log4j-1.2.15.jar:${base_dir}/conf org.apache.zookeeper.server.ZooKeeperServerMain 22181 ${base_dir}/build/tmp/zkdata 3000 $ZKMAXCNXNS &> ${base_dir}/build/tmp/zk.log & echo $! > ${base_dir}/build/tmp/zk.pid fi sleep 5 Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml (original) +++ hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml Tue Jun 9 05:04:36 2009 @@ -676,6 +676,19 @@ property. Yes - it's not consistent, and it's annoying.)</para> </listitem> </varlistentry> + <varlistentry> + <term>maxClientCnxns</term> + <listitem> + <para>(No Java system property)</para> + + <para>Limits the number of concurrent connections (at the socket + level) that a single client, identified by IP address, may make + to a single member of the ZooKeeper ensemble. This is used to + prevent certain classes of DoS attacks, including file + descriptor exhaustion. Setting this to 0 or omitting it entirely + removes the limit on concurrent connections.</para> + </listitem> + </varlistentry> </variablelist> </section> @@ -790,7 +803,7 @@ </listitem> </varlistentry> - <varlistentry> + <varlistentry> <term>weight.x=nnnnn</term> <listitem> Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Tue Jun 9 05:04:36 2009 @@ -89,6 +89,7 @@ private class MyWatcher implements Watcher { public void process(WatchedEvent event) { if (getPrintWatches()) { + ZooKeeperMain.printMessage("WATCHER::"); ZooKeeperMain.printMessage(event.toString()); } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Tue Jun 9 05:04:36 2009 @@ -32,6 +32,7 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -84,21 +85,42 @@ ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024); HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>(); + HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new HashMap<InetAddress, Set<NIOServerCnxn>>( ); int outstandingLimit = 1; - /** Create the factory, startup(zks) must be called subsequently. - * @param port listener port + int maxClientCnxns = 10; + + + /** + * Construct a new server connection factory which will accept an unlimited number + * of concurrent connections from each client (up to the file descriptor + * limits of the operating system). startup(zks) must be called subsequently. + * @param port * @throws IOException */ public Factory(int port) throws IOException { + this(port,0); + } + + + /** + * Constructs a new server connection factory where the number of concurrent connections + * from a single IP address is limited to maxcc (or unlimited if 0). + * startup(zks) must be called subsequently. + * @param port - the port to listen on for connections. + * @param maxcc - the number of concurrent connections allowed from a single client. + * @throws IOException + */ + public Factory(int port, int maxcc) throws IOException { super("NIOServerCxn.Factory:" + port); setDaemon(true); + maxClientCnxns = maxcc; this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); ss.socket().bind(new InetSocketAddress(port)); ss.configureBlocking(false); - ss.register(selector, SelectionKey.OP_ACCEPT); + ss.register(selector, SelectionKey.OP_ACCEPT); } @Override @@ -109,9 +131,8 @@ } } - public void startup(ZooKeeperServer zks) - throws IOException, InterruptedException - { + public void startup(ZooKeeperServer zks) throws IOException, + InterruptedException { start(); zks.startup(); setZooKeeperServer(zks); @@ -142,6 +163,15 @@ private void addCnxn(NIOServerCnxn cnxn) { synchronized (cnxns) { cnxns.add(cnxn); + synchronized (ipMap){ + InetAddress addr = cnxn.sock.socket().getInetAddress(); + Set<NIOServerCnxn> s = ipMap.get(addr); + if (s == null) { + s = new HashSet<NIOServerCnxn>(); + } + s.add(cnxn); + ipMap.put(addr,s); + } } } @@ -150,7 +180,13 @@ return new NIOServerCnxn(zks, sock, sk, this); } - public void run() { + private int getClientCnxnCount( InetAddress cl) { + Set<NIOServerCnxn> s = ipMap.get(cl); + if (s == null) return 0; + return s.size(); + } + + public void run() { while (!ss.socket().isClosed()) { try { selector.select(1000); @@ -164,13 +200,21 @@ for (SelectionKey k : selectedList) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k - .channel()).accept(); - sc.configureBlocking(false); - SelectionKey sk = sc.register(selector, - SelectionKey.OP_READ); - NIOServerCnxn cnxn = createConnection(sc, sk); - sk.attach(cnxn); - addCnxn(cnxn); + .channel()).accept(); + InetAddress ia = sc.socket().getInetAddress(); + int cnxncount = getClientCnxnCount(ia); + if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ + LOG.warn("Too many connections from " + ia + + " - max is " + maxClientCnxns ); + sc.close(); + } else { + sc.configureBlocking(false); + SelectionKey sk = sc.register(selector, + SelectionKey.OP_READ); + NIOServerCnxn cnxn = createConnection(sc, sk); + sk.attach(cnxn); + addCnxn(cnxn); + } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k); @@ -762,11 +806,16 @@ LOG.warn("Failed to unregister with JMX", e); } jmxConnectionBean = null; - + if (closed) { return; } closed = true; + synchronized (factory.ipMap) + { + Set<NIOServerCnxn> s = factory.ipMap.get(sock.socket().getInetAddress()); + s.remove(this); + } synchronized (factory.cnxns) { factory.cnxns.remove(this); } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java Tue Jun 9 05:04:36 2009 @@ -33,7 +33,8 @@ protected int clientPort; protected String dataDir; protected String dataLogDir; - protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME; + protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME; + protected int maxClientCnxns; /** * Parse arguments for server configuration @@ -42,7 +43,7 @@ * @throws IllegalArgumentException on invalid usage */ public void parse(String[] args) { - if (args.length < 2 || args.length > 3) { + if (args.length < 2 || args.length > 4) { throw new IllegalArgumentException("Invalid args:" + Arrays.toString(args)); } @@ -53,6 +54,9 @@ if (args.length == 3) { tickTime = Integer.parseInt(args[2]); } + if (args.length == 4) { + maxClientCnxns = Integer.parseInt(args[3]); + } } /** @@ -79,10 +83,12 @@ dataDir = config.getDataDir(); dataLogDir = config.getDataLogDir(); tickTime = config.getTickTime(); + maxClientCnxns = config.getMaxClientCnxns(); } public int getClientPort() { return clientPort; } public String getDataDir() { return dataDir; } public String getDataLogDir() { return dataLogDir; } public int getTickTime() { return tickTime; } + public int getMaxClientCnxns() { return maxClientCnxns; } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java Tue Jun 9 05:04:36 2009 @@ -36,7 +36,7 @@ Logger.getLogger(ZooKeeperServerMain.class); private static final String USAGE = - "Usage: ZooKeeperServerMain configfile | port datadir [ticktime]"; + "Usage: ZooKeeperServerMain configfile | port datadir [ticktime] [maxcnxns]"; private NIOServerCnxn.Factory cnxnFactory; @@ -103,7 +103,7 @@ File(config.dataLogDir), new File(config.dataDir)); zkServer.setTxnLogFactory(ftxn); zkServer.setTickTime(config.tickTime); - cnxnFactory = new NIOServerCnxn.Factory(config.clientPort); + cnxnFactory = new NIOServerCnxn.Factory(config.clientPort, config.getMaxClientCnxns()); cnxnFactory.startup(zkServer); cnxnFactory.join(); if (zkServer.isRunning()) { Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Tue Jun 9 05:04:36 2009 @@ -49,6 +49,7 @@ protected int syncLimit; protected int electionAlg; protected int electionPort; + protected int maxClientCnxns = 10; protected final HashMap<Long,QuorumServer> servers = new HashMap<Long, QuorumServer>(); @@ -125,6 +126,8 @@ syncLimit = Integer.parseInt(value); } else if (key.equals("electionAlg")) { electionAlg = Integer.parseInt(value); + } else if (key.equals("maxClientCnxns")) { + maxClientCnxns = Integer.parseInt(value); } else if (key.startsWith("server.")) { int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); @@ -259,7 +262,9 @@ public int getInitLimit() { return initLimit; } public int getSyncLimit() { return syncLimit; } public int getElectionAlg() { return electionAlg; } - public int getElectionPort() { return electionPort; } + public int getElectionPort() { return electionPort; } + public int getMaxClientCnxns() { return maxClientCnxns; } + public QuorumVerifier getQuorumVerifier() { return quorumVerifier; } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Tue Jun 9 05:04:36 2009 @@ -118,7 +118,7 @@ LOG.info("Starting quorum peer"); try { NIOServerCnxn.Factory cnxnFactory = - new NIOServerCnxn.Factory(config.getClientPort()); + new NIOServerCnxn.Factory(config.getClientPort(), config.getMaxClientCnxns()); quorumPeer = new QuorumPeer(); quorumPeer.setClientPort(config.getClientPort()); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=782878&r1=782877&r2=782878&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Jun 9 05:04:36 2009 @@ -52,6 +52,7 @@ new File(System.getProperty("build.test.dir", "build")); protected String hostPort = "127.0.0.1:33221"; + protected int maxCnxns = 0; protected NIOServerCnxn.Factory serverFactory = null; protected File tmpDir = null; public ClientBase() { @@ -253,15 +254,15 @@ return tmpDir; } - + static NIOServerCnxn.Factory createNewServerInstance(File dataDir, - NIOServerCnxn.Factory factory, String hostPort) + NIOServerCnxn.Factory factory, String hostPort, int maxCnxns) throws IOException, InterruptedException { ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000); final int PORT = Integer.parseInt(hostPort.split(":")[1]); if (factory == null) { - factory = new NIOServerCnxn.Factory(PORT); + factory = new NIOServerCnxn.Factory(PORT,maxCnxns); } factory.startup(zks); @@ -314,7 +315,7 @@ protected void startServer() throws Exception { LOG.info("STARTING server"); - serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort); + serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort, maxCnxns); // ensure that only server and data bean are registered JMXEnv.ensureOnly("InMemoryDataTree", "StandaloneServer_port"); } @@ -326,7 +327,7 @@ // ensure no beans are leftover JMXEnv.ensureOnly(); } - + @Override protected void tearDown() throws Exception { LOG.info("tearDown starting");