Author: mahadev
Date: Tue Dec  8 19:12:04 2009
New Revision: 888526

URL: http://svn.apache.org/viewvc?rev=888526&view=rev
Log:
ZOOKEEPER-597. ASyncHammerTest is failing intermittently on hudson trunk. (pat, 
ben and mahadev)

Modified:
    hadoop/zookeeper/branches/branch-3.2/CHANGES.txt
    
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/CHANGES.txt?rev=888526&r1=888525&r2=888526&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.2/CHANGES.txt Tue Dec  8 19:12:04 2009
@@ -35,6 +35,9 @@
   ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created
   outside of normal processing (ben reed and mahadev via mahadev)
 
+  ZOOKEEPER-597. ASyncHammerTest is failing intermittently on hudson trunk. 
+  (pat, ben and mahadev)
+
 Release 3.2.1 - 2009-08-27
 
 Backward compatible changes:

Modified: 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=888526&r1=888525&r2=888526&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 Tue Dec  8 19:12:04 2009
@@ -332,29 +332,35 @@
     LinkedList<Request> outstanding = new LinkedList<Request>();
 
     void sendBuffer(ByteBuffer bb) {
-        // We check if write interest here because if it is NOT set, nothing 
is queued, so
-        // we can try to send the buffer right away without waking up the 
selector
-        if ((sk.interestOps()&SelectionKey.OP_WRITE) == 0) {
-            try {
-                sock.write(bb);
-            } catch (IOException e) {
-                // we are just doing best effort right now
+        try {
+            // We check if write interest here because if it is NOT set, 
nothing
+            // is queued, so
+            // we can try to send the buffer right away without waking up the
+            // selector
+            if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
+                try {
+                    sock.write(bb);
+                } catch (IOException e) {
+                    // we are just doing best effort right now
+                }
             }
-        }
-        // if there is nothing left to send, we are done
-        if (bb.remaining() == 0) {
-            return;
-        }
-        synchronized (factory) {
-            sk.selector().wakeup();
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
-                        + " is valid: " + sk.isValid());
-            }
-            outgoingBuffers.add(bb);
-            if (sk.isValid()) {
-                sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
+            // if there is nothing left to send, we are done
+            if (bb.remaining() == 0) {
+                return;
             }
+            synchronized (factory) {
+                sk.selector().wakeup();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
+                            + " is valid: " + sk.isValid());
+                }
+                outgoingBuffers.add(bb);
+                if (sk.isValid()) {
+                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Unexpected Exception: ", e);
         }
     }
 
@@ -884,36 +890,40 @@
      *      org.apache.jute.Record, java.lang.String)
      */
     synchronized public void sendResponse(ReplyHeader h, Record r, String tag) 
{
-        if (closed) {
-            return;
-        }
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        // Make space for length
-        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
         try {
-            baos.write(fourBytes);
-            bos.writeRecord(h, "header");
-            if (r != null) {
-                bos.writeRecord(r, tag);
+            if (closed) {
+                return;
             }
-            baos.close();
-        } catch (IOException e) {
-            LOG.error("Error serializing response");
-        }
-        byte b[] = baos.toByteArray();
-        ByteBuffer bb = ByteBuffer.wrap(b);
-        bb.putInt(b.length - 4).rewind();
-        sendBuffer(bb);
-        if (h.getXid() > 0) {
-            synchronized (this.factory) {
-                outstandingRequests--;
-                // check throttling
-                if (zk.getInProcess() < factory.outstandingLimit
-                        || outstandingRequests < 1) {
-                    sk.selector().wakeup();
-                    enableRecv();
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            // Make space for length
+            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
+            try {
+                baos.write(fourBytes);
+                bos.writeRecord(h, "header");
+                if (r != null) {
+                    bos.writeRecord(r, tag);
                 }
+                baos.close();
+            } catch (IOException e) {
+                LOG.error("Error serializing response");
             }
+            byte b[] = baos.toByteArray();
+            ByteBuffer bb = ByteBuffer.wrap(b);
+            bb.putInt(b.length - 4).rewind();
+            sendBuffer(bb);
+            if (h.getXid() > 0) {
+                synchronized (this.factory) {
+                    outstandingRequests--;
+                    // check throttling
+                    if (zk.getInProcess() < factory.outstandingLimit
+                            || outstandingRequests < 1) {
+                        sk.selector().wakeup();
+                        enableRecv();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Unexpected exception. Destruction averted.", e);
         }
     }
 

Modified: 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=888526&r1=888525&r2=888526&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
 Tue Dec  8 19:12:04 2009
@@ -142,6 +142,8 @@
             }
         } catch (InterruptedException e) {
             LOG.warn("Interrupted exception while waiting", e);
+        } catch(Throwable t) {
+            LOG.warn("Unexpected exception. Destruction averted.", t);
         }
         LOG.info("CommitProcessor exited loop!");
     }


Reply via email to