Author: mahadev Date: Tue Dec 8 19:12:04 2009 New Revision: 888526 URL: http://svn.apache.org/viewvc?rev=888526&view=rev Log: ZOOKEEPER-597. ASyncHammerTest is failing intermittently on hudson trunk. (pat, ben and mahadev)
Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/CHANGES.txt?rev=888526&r1=888525&r2=888526&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.2/CHANGES.txt Tue Dec 8 19:12:04 2009 @@ -35,6 +35,9 @@ ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created outside of normal processing (ben reed and mahadev via mahadev) + ZOOKEEPER-597. ASyncHammerTest is failing intermittently on hudson trunk. + (pat, ben and mahadev) + Release 3.2.1 - 2009-08-27 Backward compatible changes: Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=888526&r1=888525&r2=888526&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Tue Dec 8 19:12:04 2009 @@ -332,29 +332,35 @@ LinkedList<Request> outstanding = new LinkedList<Request>(); void sendBuffer(ByteBuffer bb) { - // We check if write interest here because if it is NOT set, nothing is queued, so - // we can try to send the buffer right away without waking up the selector - if ((sk.interestOps()&SelectionKey.OP_WRITE) == 0) { - try { - sock.write(bb); - } catch (IOException e) { - // we are just doing best effort right now + try { + // We check if write interest here because if it is NOT set, nothing + // is queued, so + // we can try to send the buffer right away without waking up the + // selector + if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { + try { + sock.write(bb); + } catch (IOException e) { + // we are just doing best effort right now + } } - } - // if there is nothing left to send, we are done - if (bb.remaining() == 0) { - return; - } - synchronized (factory) { - sk.selector().wakeup(); - if (LOG.isTraceEnabled()) { - LOG.trace("Add a buffer to outgoingBuffers, sk " + sk - + " is valid: " + sk.isValid()); - } - outgoingBuffers.add(bb); - if (sk.isValid()) { - sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); + // if there is nothing left to send, we are done + if (bb.remaining() == 0) { + return; } + synchronized (factory) { + sk.selector().wakeup(); + if (LOG.isTraceEnabled()) { + LOG.trace("Add a buffer to outgoingBuffers, sk " + sk + + " is valid: " + sk.isValid()); + } + outgoingBuffers.add(bb); + if (sk.isValid()) { + sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); + } + } + } catch (Exception e) { + LOG.warn("Unexpected Exception: ", e); } } @@ -884,36 +890,40 @@ * org.apache.jute.Record, java.lang.String) */ synchronized public void sendResponse(ReplyHeader h, Record r, String tag) { - if (closed) { - return; - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - // Make space for length - BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); try { - baos.write(fourBytes); - bos.writeRecord(h, "header"); - if (r != null) { - bos.writeRecord(r, tag); + if (closed) { + return; } - baos.close(); - } catch (IOException e) { - LOG.error("Error serializing response"); - } - byte b[] = baos.toByteArray(); - ByteBuffer bb = ByteBuffer.wrap(b); - bb.putInt(b.length - 4).rewind(); - sendBuffer(bb); - if (h.getXid() > 0) { - synchronized (this.factory) { - outstandingRequests--; - // check throttling - if (zk.getInProcess() < factory.outstandingLimit - || outstandingRequests < 1) { - sk.selector().wakeup(); - enableRecv(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // Make space for length + BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); + try { + baos.write(fourBytes); + bos.writeRecord(h, "header"); + if (r != null) { + bos.writeRecord(r, tag); } + baos.close(); + } catch (IOException e) { + LOG.error("Error serializing response"); } + byte b[] = baos.toByteArray(); + ByteBuffer bb = ByteBuffer.wrap(b); + bb.putInt(b.length - 4).rewind(); + sendBuffer(bb); + if (h.getXid() > 0) { + synchronized (this.factory) { + outstandingRequests--; + // check throttling + if (zk.getInProcess() < factory.outstandingLimit + || outstandingRequests < 1) { + sk.selector().wakeup(); + enableRecv(); + } + } + } + } catch (Exception e) { + LOG.warn("Unexpected exception. Destruction averted.", e); } } Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=888526&r1=888525&r2=888526&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original) +++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Tue Dec 8 19:12:04 2009 @@ -142,6 +142,8 @@ } } catch (InterruptedException e) { LOG.warn("Interrupted exception while waiting", e); + } catch(Throwable t) { + LOG.warn("Unexpected exception. Destruction averted.", t); } LOG.info("CommitProcessor exited loop!"); }