Author: phunt
Date: Tue Oct 19 22:33:53 2010
New Revision: 1024439

URL: http://svn.apache.org/viewvc?rev=1024439&view=rev
Log:
ZOOKEEPER-893. ZooKeeper high cpu usage when invalid requests

Modified:
    hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
    
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1024439&r1=1024438&r2=1024439&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Tue Oct 19 22:33:53 2010
@@ -41,6 +41,9 @@ BUGFIXES:
   ZOOKEEPER-855. clientPortBindAddress should be clientPortAddress
   (Jared Cantwell via fpj)
 
+  ZOOKEEPER-893. ZooKeeper high cpu usage when invalid requests
+  (Thijs Terlouw via phunt)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-789. Improve FLE log messages (flavio via phunt)

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1024439&r1=1024438&r2=1024439&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 Tue Oct 19 22:33:53 2010
@@ -671,28 +671,33 @@ public class QuorumCnxManager {
                     }
                     msgLength.position(0);
                     int length = msgLength.getInt();
+                    if(length <= 0) {
+                        throw new IOException("Invalid packet length:" + 
length);
+                    }
                     /**
                      * Allocates a new ByteBuffer to receive the message
                      */
-                    if (length > 0) {
-                        if (length > PACKETMAXSIZE) {
-                            throw new IOException("Invalid packet of length " 
+ length);
-                        }
-                        byte[] msgArray = new byte[length];
-                        ByteBuffer message = ByteBuffer.wrap(msgArray);
-                        int numbytes = 0;
-                        while (message.hasRemaining()) {
-                            numbytes += channel.read(message);
-                        }
-                        message.position(0);
-                        synchronized (recvQueue) {
-                            recvQueue
-                                    .put(new Message(message.duplicate(), 
sid));
+                    if (length > PACKETMAXSIZE) {
+                        throw new IOException("Invalid packet of length " + 
length);
+                    }
+                    byte[] msgArray = new byte[length];
+                    ByteBuffer message = ByteBuffer.wrap(msgArray);
+                    int numbytes = 0;
+                    int temp_numbytes = 0;
+                    while (message.hasRemaining()) {
+                        temp_numbytes = channel.read(message); 
+                        if(temp_numbytes < 0) {
+                            throw new IOException("Channel eof before end");
                         }
-                        msgLength.position(0);
+                        numbytes += temp_numbytes;
+                    }
+                    message.position(0);
+                    synchronized (recvQueue) {
+                        recvQueue
+                        .put(new Message(message.duplicate(), sid));
                     }
+                    msgLength.position(0);
                 }
-
             } catch (Exception e) {
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 
                         self.getId() + ", error = " + e);

Modified: 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1024439&r1=1024438&r2=1024439&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
 Tue Oct 19 22:33:53 2010
@@ -21,6 +21,7 @@ package org.apache.zookeeper.test;
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 import java.util.HashMap;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
@@ -211,6 +212,63 @@ public class CnxManagerTest extends Test
         if((end - begin) > 6000) fail("Waited more than necessary");
     
     }
- 
     
+    /**
+     * Tests a bug in QuorumCnxManager that causes a spin lock
+     * when a negative value is sent. This test checks if the 
+     * connection is being closed upon a message with negative
+     * length.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCnxManagerSpinLock() throws Exception {               
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 
3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        InetSocketAddress addr = new InetSocketAddress(port);
+        
+        Thread.sleep(1000);
+        
+        SocketChannel sc = SocketChannel.open();
+        sc.socket().connect(peers.get(new Long(1)).electionAddr, 5000);
+        
+        /*
+         * Write id first then negative length.
+         */
+        byte[] msgBytes = new byte[8];
+        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+        msgBuffer.putLong(new Long(2));
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        msgBuffer = ByteBuffer.wrap(new byte[4]);
+        msgBuffer.putInt(-20);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        Thread.sleep(1000);
+        
+        try{
+            /*
+             * Write a number of times until it
+             * detects that the socket is broken.
+             */
+            for(int i = 0; i < 100; i++){
+                msgBuffer.position(0);
+                sc.write(msgBuffer);
+            }
+            fail("Socket has not been closed");
+        } catch (Exception e) {
+            LOG.info("Socket has been closed as expected");
+        }
+    }   
 }
\ No newline at end of file


Reply via email to