Author: markt Date: Mon Jan 12 08:59:42 2015 New Revision: 1651043 URL: http://svn.apache.org/r1651043 Log: A further round of refactoring of writes. Primary change is the use of socketWriteBuffer by default for all writes
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Mon Jan 12 08:59:42 2015 @@ -737,6 +737,7 @@ public class AjpProcessor<S> extends Abs cping = true; try { socketWrapper.write(true, pongMessageArray, 0, pongMessageArray.length); + socketWrapper.flush(true); } catch (IOException e) { setErrorState(ErrorState.CLOSE_NOW, e); } @@ -1035,6 +1036,7 @@ public class AjpProcessor<S> extends Abs // Request more data immediately if (!waitingForBodyMessage) { socketWrapper.write(true, getBodyMessageArray, 0, getBodyMessageArray.length); + socketWrapper.flush(true); waitingForBodyMessage = true; } @@ -1442,6 +1444,7 @@ public class AjpProcessor<S> extends Abs // Write to buffer responseMessage.end(); socketWrapper.write(true, responseMessage.getBuffer(), 0, responseMessage.getLen()); + socketWrapper.flush(true); } @@ -1455,6 +1458,7 @@ public class AjpProcessor<S> extends Abs if (explicit && !finished) { // Send the flush message socketWrapper.write(true, flushMessageArray, 0, flushMessageArray.length); + socketWrapper.flush(true); } } @@ -1490,6 +1494,7 @@ public class AjpProcessor<S> extends Abs } else { socketWrapper.write(true, endMessageArray, 0, endMessageArray.length); } + socketWrapper.flush(true); } @@ -1556,6 +1561,7 @@ public class AjpProcessor<S> extends Abs responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime); responseMessage.end(); socketWrapper.write(blocking, responseMessage.getBuffer(), 0, responseMessage.getLen()); + socketWrapper.flush(blocking); len -= thisTime; off += thisTime; Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Mon Jan 12 08:59:42 2015 @@ -123,6 +123,12 @@ public class UpgradeServletOutputStream @Override + public void flush() throws IOException { + socketWrapper.flush(listener == null); + } + + + @Override public void close() throws IOException { closeRequired = true; socketWrapper.close(); @@ -130,7 +136,7 @@ public class UpgradeServletOutputStream private void preWriteChecks() { - if (listener != null && socketWrapper.hasDataToWrite()) { + if (listener != null && !socketWrapper.canWrite()) { throw new IllegalStateException(sm.getString("upgrade.sis.write.ise")); } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Mon Jan 12 08:59:42 2015 @@ -2509,8 +2509,7 @@ public class AprEndpoint extends Abstrac @Override - protected int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip) - throws IOException { + protected int doWrite(boolean block, boolean flip) throws IOException { if (closed) { throw new IOException(sm.getString("apr.closed", getSocket())); } @@ -2521,7 +2520,7 @@ public class AprEndpoint extends Abstrac readLock.lock(); try { if (getBlockingStatus() == block) { - return doWriteInternal(bytebuffer, flip); + return doWriteInternal(flip); } } finally { readLock.unlock(); @@ -2541,7 +2540,7 @@ public class AprEndpoint extends Abstrac readLock.lock(); try { writeLock.unlock(); - return doWriteInternal(bytebuffer, flip); + return doWriteInternal(flip); } finally { readLock.unlock(); } @@ -2555,10 +2554,9 @@ public class AprEndpoint extends Abstrac } - private int doWriteInternal(ByteBuffer bytebuffer, boolean flip) - throws IOException { + private int doWriteInternal(boolean flip) throws IOException { if (flip) { - bytebuffer.flip(); + socketWriteBuffer.flip(); writeBufferFlipped = true; } @@ -2571,7 +2569,7 @@ public class AprEndpoint extends Abstrac if (sslOutputBuffer.remaining() == 0) { // Buffer was fully written last time around sslOutputBuffer.clear(); - transfer(bytebuffer, sslOutputBuffer); + transfer(socketWriteBuffer, sslOutputBuffer); sslOutputBuffer.flip(); } else { // Buffer still has data from previous attempt to write @@ -2585,8 +2583,9 @@ public class AprEndpoint extends Abstrac sslOutputBuffer.position() + sslWritten); } } else { - thisTime = Socket.sendb(getSocket().longValue(), bytebuffer, - bytebuffer.position(), bytebuffer.limit() - bytebuffer.position()); + thisTime = Socket.sendb(getSocket().longValue(), + socketWriteBuffer, socketWriteBuffer.position(), + socketWriteBuffer.limit() - socketWriteBuffer.position()); } if (Status.APR_STATUS_IS_EAGAIN(-thisTime)) { thisTime = 0; @@ -2601,11 +2600,11 @@ public class AprEndpoint extends Abstrac Integer.valueOf(-thisTime), getSocket(), this)); } written += thisTime; - bytebuffer.position(bytebuffer.position() + thisTime); - } while (thisTime > 0 && bytebuffer.hasRemaining()); + socketWriteBuffer.position(socketWriteBuffer.position() + thisTime); + } while (thisTime > 0 && socketWriteBuffer.hasRemaining()); - if (bytebuffer.remaining() == 0) { - bytebuffer.clear(); + if (socketWriteBuffer.remaining() == 0) { + socketWriteBuffer.clear(); writeBufferFlipped = false; } // If there is data left in the buffer the socket will be registered for Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Jan 12 08:59:42 2015 @@ -1064,7 +1064,10 @@ public class Nio2Endpoint extends Abstra @Override public void close() throws IOException { - getSocket().close(); + Nio2Channel socket = getSocket(); + if (socket != null) { + socket.close(); + } } @@ -1107,167 +1110,128 @@ public class Nio2Endpoint extends Abstra } + /** + * {@inheritDoc} + * <p> + * Overridden for NIO2 to enable a gathering write to be used to write + * all of the remaining data in a single additional write should a + * non-blocking write leave data in the buffer. + */ @Override - public void write(boolean block, byte[] buf, int off, int len) throws IOException { - if (len == 0 || getSocket() == null) - return; - - if (block) { - try { - do { - int thisTime = transfer(buf, off, len, socketWriteBuffer); - len = len - thisTime; - off = off + thisTime; - socketWriteBuffer.flip(); - while (socketWriteBuffer.hasRemaining()) { - if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { - throw new EOFException(sm.getString("iob.failedwrite")); - } - } - socketWriteBuffer.clear(); - } while (len > 0); - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw new IOException(e); - } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (TimeoutException e) { - throw new SocketTimeoutException(); - } - } else { - // FIXME: Possible new behavior: - // If there's non blocking abuse (like a test writing 1MB in a single - // "non blocking" write), then block until the previous write is - // done rather than continue buffering - // Also allows doing autoblocking - // Could be "smart" with coordination with the main CoyoteOutputStream to - // indicate the end of a write - // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) - synchronized (writeCompletionHandler) { - if (writePending.tryAcquire()) { - // No pending completion handler, so writing to the main buffer - // is possible - int thisTime = transfer(buf, off, len, socketWriteBuffer); - len = len - thisTime; - off = off + thisTime; - if (len > 0) { - // Remaining data must be buffered - addToBuffers(buf, off, len); - } - flush(false, true); - } else { + protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException { + // FIXME: Possible new behavior: + // If there's non blocking abuse (like a test writing 1MB in a single + // "non blocking" write), then block until the previous write is + // done rather than continue buffering + // Also allows doing autoblocking + // Could be "smart" with coordination with the main CoyoteOutputStream to + // indicate the end of a write + // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) + synchronized (writeCompletionHandler) { + if (writePending.tryAcquire()) { + // No pending completion handler, so writing to the main buffer + // is possible + int thisTime = transfer(buf, off, len, socketWriteBuffer); + len = len - thisTime; + off = off + thisTime; + if (len > 0) { + // Remaining data must be buffered addToBuffers(buf, off, len); } + flushNonBlocking(true); + } else { + addToBuffers(buf, off, len); } } } @Override - protected int doWrite(ByteBuffer buffer, boolean block, boolean flip) throws IOException { - // NO-OP for NIO2 since write(boolean, byte[], int, int) and - // flush(boolean, boolean) are over-ridden. - return 0; + protected int doWrite(boolean block, boolean flip) throws IOException { + // Only called in the non-blocking case since + // writeNonBlocking(byte[], int, int) and flush(boolean, boolean) + // are over-ridden. + + int result = -1; + try { + socketWriteBuffer.flip(); + result = socketWriteBuffer.remaining(); + while (socketWriteBuffer.hasRemaining()) { + if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { + throw new EOFException(sm.getString("iob.failedwrite")); + } + } + socketWriteBuffer.clear(); + return result; + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(e); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (TimeoutException e) { + throw new SocketTimeoutException(); + } } @Override - public boolean flush(boolean block) throws IOException { - if (getError() != null) { - throw getError(); + protected void flushBlocking() throws IOException { + // Before doing a blocking flush, make sure that any pending non + // blocking write has completed. + try { + if (writePending.tryAcquire(getTimeout(), TimeUnit.MILLISECONDS)) { + writePending.release(); + } else { + throw new SocketTimeoutException(); + } + } catch (InterruptedException e) { + // Ignore } - return super.flush(block); - } + super.flushBlocking(); + } @Override - protected boolean flush(boolean block, boolean hasPermit) throws IOException { - if (getSocket() == null) - return false; + protected boolean flushNonBlocking() { + return flushNonBlocking(false); + } - if (block) { - try { - if (writePending.tryAcquire(getTimeout(), TimeUnit.MILLISECONDS)) { - writePending.release(); - } else { - // TODO - } - } catch (InterruptedException e) { - // Ignore timeout - } - try { - if (bufferedWrites.size() > 0) { - for (ByteBufferHolder holder : bufferedWrites) { - holder.flip(); - ByteBuffer buffer = holder.getBuf(); - while (buffer.hasRemaining()) { - if (getSocket().write(buffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { - throw new EOFException(sm.getString("iob.failedwrite")); - } - } - } - bufferedWrites.clear(); - } + private boolean flushNonBlocking(boolean hasPermit) { + synchronized (writeCompletionHandler) { + if (hasPermit || writePending.tryAcquire()) { if (!writeBufferFlipped) { socketWriteBuffer.flip(); writeBufferFlipped = true; } - while (socketWriteBuffer.hasRemaining()) { - if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { - throw new EOFException(sm.getString("iob.failedwrite")); + if (bufferedWrites.size() > 0) { + // Gathering write of the main buffer plus all leftovers + ArrayList<ByteBuffer> arrayList = new ArrayList<>(); + if (socketWriteBuffer.hasRemaining()) { + arrayList.add(socketWriteBuffer); + } + for (ByteBufferHolder buffer : bufferedWrites) { + buffer.flip(); + arrayList.add(buffer.getBuf()); } - } - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); + bufferedWrites.clear(); + ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]); + getSocket().write(array, 0, array.length, getTimeout(), + TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); + } else if (socketWriteBuffer.hasRemaining()) { + // Regular write + getSocket().write(socketWriteBuffer, getTimeout(), + TimeUnit.MILLISECONDS, socketWriteBuffer, writeCompletionHandler); } else { - throw new IOException(e); - } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (TimeoutException e) { - throw new SocketTimeoutException(); - } - socketWriteBuffer.clear(); - writeBufferFlipped = false; - return false; - } else { - synchronized (writeCompletionHandler) { - if (hasPermit || writePending.tryAcquire()) { - if (!writeBufferFlipped) { - socketWriteBuffer.flip(); - writeBufferFlipped = true; - } - if (bufferedWrites.size() > 0) { - // Gathering write of the main buffer plus all leftovers - ArrayList<ByteBuffer> arrayList = new ArrayList<>(); - if (socketWriteBuffer.hasRemaining()) { - arrayList.add(socketWriteBuffer); - } - for (ByteBufferHolder buffer : bufferedWrites) { - buffer.flip(); - arrayList.add(buffer.getBuf()); - } - bufferedWrites.clear(); - ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]); - getSocket().write(array, 0, array.length, getTimeout(), - TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); - } else if (socketWriteBuffer.hasRemaining()) { - // Regular write - getSocket().write(socketWriteBuffer, getTimeout(), - TimeUnit.MILLISECONDS, socketWriteBuffer, writeCompletionHandler); - } else { - // Nothing was written - writePending.release(); - socketWriteBuffer.clear(); - writeBufferFlipped = false; - } + // Nothing was written + writePending.release(); + socketWriteBuffer.clear(); + writeBufferFlipped = false; } - return hasDataToWrite(); } + return hasDataToWrite(); } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Jan 12 08:59:42 2015 @@ -1472,7 +1472,10 @@ public class NioEndpoint extends Abstrac @Override public void close() throws IOException { - getSocket().close(); + NioChannel socket = getSocket(); + if (socket != null) { + socket.close(); + } } @@ -1509,10 +1512,10 @@ public class NioEndpoint extends Abstrac @Override - protected synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip) + protected synchronized int doWrite(boolean block, boolean flip) throws IOException { if (flip) { - bytebuffer.flip(); + socketWriteBuffer.flip(); writeBufferFlipped = true; } @@ -1525,7 +1528,7 @@ public class NioEndpoint extends Abstrac // Ignore } try { - written = pool.write(bytebuffer, getSocket(), selector, writeTimeout, block); + written = pool.write(socketWriteBuffer, getSocket(), selector, writeTimeout, block); // Make sure we are flushed do { if (getSocket().flush(true, selector, writeTimeout)) break; @@ -1535,8 +1538,8 @@ public class NioEndpoint extends Abstrac pool.put(selector); } } - if (bytebuffer.remaining() == 0) { - bytebuffer.clear(); + if (socketWriteBuffer.remaining() == 0) { + socketWriteBuffer.clear(); writeBufferFlipped = false; } // If there is data left in the buffer the socket will be registered for Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Mon Jan 12 08:59:42 2015 @@ -188,9 +188,9 @@ public abstract class SocketWrapperBase< } /** - * Checks to see if there is any writes pending and if there is calls + * Checks to see if there are any writes pending and if there are calls * {@link #registerWriteInterest()} to trigger a callback once the pending - * write has completed. + * writes have completed. * <p> * Note: Once this method has returned <code>false</code> it <b>MUST NOT</b> * be called again until the pending write has completed and the @@ -202,13 +202,19 @@ public abstract class SocketWrapperBase< * written otherwise <code>false</code> */ public boolean isReadyForWrite() { - boolean result = !hasDataToWrite(); + boolean result = canWrite(); if (!result) { registerWriteInterest(); } return result; } + + public boolean canWrite() { + return !writeBufferFlipped && socketWriteBuffer.hasRemaining() && + bufferedWrites.size() == 0; + } + public void addDispatch(DispatchType dispatchType) { synchronized (dispatches) { dispatches.add(dispatchType); @@ -286,79 +292,116 @@ public abstract class SocketWrapperBase< public abstract void unRead(ByteBuffer input); public abstract void close() throws IOException; + /** * Writes the provided data to the socket, buffering any remaining data if - * used in non-blocking mode. If any data remains in the buffers from a - * previous write then that data will be written before this data. It is - * therefore unnecessary to call flush() before calling this method. + * used in non-blocking mode. * * @param block <code>true<code> if a blocking write should be used, * otherwise a non-blocking write will be used - * @param b The byte array containing the data to be written + * @param buf The byte array containing the data to be written * @param off The offset within the byte array of the data to be written * @param len The length of the data to be written * * @throws IOException If an IO error occurs during the write */ - public void write(boolean block, byte[] b, int off, int len) throws IOException { - // Always flush any data remaining in the buffers - boolean dataLeft = flush(block, true); - - if (len == 0 || b == null) { + public void write(boolean block, byte[] buf, int off, int len) throws IOException { + if (len == 0 || buf == null || getSocket() == null) { return; } - // Keep writing until all the data is written or a non-blocking write - // leaves data in the buffer - while (!dataLeft && len > 0) { - int thisTime = transfer(b, off, len, socketWriteBuffer); - len = len - thisTime; - off = off + thisTime; - int written = doWrite(socketWriteBuffer, block, true); - if (written == 0) { - dataLeft = true; - } else { - dataLeft = flush(block, true); - } + // While the implementations for blocking and non-blocking writes are + // very similar they have been split into separate methods to allow + // sub-classes to override them individually. NIO2, for example, + // overrides the non-blocking write but not the blocking write. + if (block) { + writeBlocking(buf, off, len); + } else { + writeNonBlocking(buf, off, len); } - // Prevent timeouts for just doing client writes + // Prevent timeouts access(); + } - if (!block && len > 0) { - // Remaining data must be buffered - addToBuffers(b, off, len); + + /** + * Transfers the data to the socket write buffer (writing that data to the + * socket if the buffer fills up using a blocking write) until all the data + * has been transferred and space remains in the socket write buffer. + * + * @param buf The byte array containing the data to be written + * @param off The offset within the byte array of the data to be written + * @param len The length of the data to be written + * + * @throws IOException If an IO error occurs during the write + */ + protected void writeBlocking(byte[] buf, int off, int len) throws IOException { + // Note: There is an implementation assumption that if the switch from + // non-blocking to blocking has been made then any pending + // non-blocking writes were flushed at the time the switch + // occurred. + + // Keep writing until all the data has been transferred to the socket + // write buffer and space remains in that buffer + int thisTime = transfer(buf, off, len, socketWriteBuffer); + while (socketWriteBuffer.remaining() == 0) { + len = len - thisTime; + off = off + thisTime; + // TODO: There is an assumption here that the blocking write will + // block until all the data is written or the write times out. + // Document this assumption in the Javadoc for doWrite(), + // ensure it is valid for all implementations of doWrite() and + // then review all callers of doWrite() and review what + // simplifications this offers. + doWrite(true, true); + thisTime = transfer(buf, off, len, socketWriteBuffer); } } /** - * Writes as much data as possible from any that remains in the buffers. + * Transfers the data to the socket write buffer (writing that data to the + * socket if the buffer fills up using a non-blocking write) until either + * all the data has been transferred and space remains in the socket write + * buffer or a non-blocking write leaves data in the socket write buffer. * - * @param block <code>true<code> if a blocking write should be used, - * otherwise a non-blocking write will be used - * - * @return <code>true</code> if data remains to be flushed after this method - * completes, otherwise <code>false</code>. In blocking mode - * therefore, the return value should always be <code>false</code> + * @param buf The byte array containing the data to be written + * @param off The offset within the byte array of the data to be written + * @param len The length of the data to be written * * @throws IOException If an IO error occurs during the write */ - public boolean flush(boolean block) throws IOException { - return flush(block, false); + protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException { + if (!writeBufferFlipped) { + int thisTime = transfer(buf, off, len, socketWriteBuffer); + len = len - thisTime; + while (socketWriteBuffer.remaining() == 0) { + off = off + thisTime; + if (doWrite(false, !writeBufferFlipped) == 0) { + break; + } + if (writeBufferFlipped) { + thisTime = 0; + } else { + thisTime = transfer(buf, off, len, socketWriteBuffer); + } + len = len - thisTime; + } + } + + if (len > 0) { + // Remaining data must be buffered + addToBuffers(buf, off, len); + } } /** * Writes as much data as possible from any that remains in the buffers. - * This method exists for those implementations (e.g. NIO2) that need - * slightly different behaviour depending on if flush() was called directly - * or by another method in this class or a sub-class. - * - * @param block <code>true<code> if a blocking write should be used, - * otherwise a non-blocking write will be used - * @param internal <code>true<code> if flush() was called by another method - * in class or sub-class + * + * @param block <code>true<code> if a blocking write should be used, + * otherwise a non-blocking write will be used * * @return <code>true</code> if data remains to be flushed after this method * completes, otherwise <code>false</code>. In blocking mode @@ -366,16 +409,57 @@ public abstract class SocketWrapperBase< * * @throws IOException If an IO error occurs during the write */ - protected boolean flush(boolean block, boolean internal) throws IOException { + public boolean flush(boolean block) throws IOException { + if (getSocket() == null) { + return false; + } - // Prevent timeout for async + if (getError() != null) { + throw getError(); + } + + boolean result = false; + if (block) { + // A blocking flush will always empty the buffer. + flushBlocking(); + } else { + result = flushNonBlocking(); + } + + // Prevent timeouts access(); + return result; + } + + + protected void flushBlocking() throws IOException { + doWrite(true, !writeBufferFlipped); + + if (bufferedWrites.size() > 0) { + Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); + while (!hasMoreDataToFlush() && bufIter.hasNext()) { + ByteBufferHolder buffer = bufIter.next(); + buffer.flip(); + while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { + transfer(buffer.getBuf(), socketWriteBuffer); + if (buffer.getBuf().remaining() == 0) { + bufIter.remove(); + } + doWrite(true, !writeBufferFlipped); + } + } + } + + } + + + protected boolean flushNonBlocking() throws IOException { boolean dataLeft = hasMoreDataToFlush(); // Write to the socket, if there is anything to write if (dataLeft) { - doWrite(socketWriteBuffer, block, !writeBufferFlipped); + doWrite(false, !writeBufferFlipped); } dataLeft = hasMoreDataToFlush(); @@ -385,13 +469,12 @@ public abstract class SocketWrapperBase< while (!hasMoreDataToFlush() && bufIter.hasNext()) { ByteBufferHolder buffer = bufIter.next(); buffer.flip(); - while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { + while (!hasMoreDataToFlush() && buffer.getBuf().remaining() > 0) { transfer(buffer.getBuf(), socketWriteBuffer); if (buffer.getBuf().remaining() == 0) { bufIter.remove(); } - doWrite(socketWriteBuffer, block, true); - //here we must break if we didn't finish the write + doWrite(false, !writeBufferFlipped); } } } @@ -400,8 +483,7 @@ public abstract class SocketWrapperBase< } - protected abstract int doWrite(ByteBuffer buffer, boolean block, boolean flip) - throws IOException; + protected abstract int doWrite(boolean block, boolean flip) throws IOException; protected void addToBuffers(byte[] buf, int offset, int length) { Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Mon Jan 12 08:59:42 2015 @@ -88,7 +88,7 @@ public class WsRemoteEndpointImplServer // was written return; } - boolean complete = true; + boolean complete = false; try { // If this is false there will be a call back when it is true while (sos.isReady()) { @@ -103,6 +103,7 @@ public class WsRemoteEndpointImplServer } } if (complete) { + sos.flush(); wsWriteTimeout.unregister(this); clearHandler(null, useDispatch); if (close) { @@ -117,9 +118,9 @@ public class WsRemoteEndpointImplServer clearHandler(ioe, useDispatch); close(); } + if (!complete) { // Async write is in progress - long timeout = getSendTimeout(); if (timeout > 0) { // Register with timeout thread Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original) +++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Mon Jan 12 08:59:42 2015 @@ -221,19 +221,19 @@ public class TestNonBlockingAPI extends boolean found = false; for (int i = totalBodyRead; i < (totalBodyRead + line.length()); i++) { if (DATA[i] != resultBytes[lineStart + i - totalBodyRead]) { - int dataStart = i - 16; + int dataStart = i - 64; if (dataStart < 0) { dataStart = 0; } - int dataEnd = i + 16; + int dataEnd = i + 64; if (dataEnd > DATA.length) { dataEnd = DATA.length; } - int resultStart = lineStart + i - totalBodyRead - 16; + int resultStart = lineStart + i - totalBodyRead - 64; if (resultStart < 0) { resultStart = 0; } - int resultEnd = lineStart + i - totalBodyRead + 16; + int resultEnd = lineStart + i - totalBodyRead + 64; if (resultEnd > resultString.length()) { resultEnd = resultString.length(); } @@ -492,25 +492,21 @@ public class TestNonBlockingAPI extends @Override public void onTimeout(AsyncEvent event) throws IOException { log.info("onTimeout"); - } @Override public void onStartAsync(AsyncEvent event) throws IOException { log.info("onStartAsync"); - } @Override public void onError(AsyncEvent event) throws IOException { log.info("AsyncListener.onError"); - } @Override public void onComplete(AsyncEvent event) throws IOException { log.info("onComplete"); - } }); // step 2 - notify on read Modified: tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java?rev=1651043&r1=1651042&r2=1651043&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java (original) +++ tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java Mon Jan 12 08:59:42 2015 @@ -47,7 +47,6 @@ import static org.apache.catalina.startu import org.apache.catalina.Context; import org.apache.catalina.startup.Tomcat; import org.apache.catalina.startup.TomcatBaseTest; -import org.apache.catalina.util.IOTools; public class TestUpgrade extends TomcatBaseTest { @@ -234,8 +233,12 @@ public class TestUpgrade extends TomcatB try (ServletInputStream sis = connection.getInputStream(); ServletOutputStream sos = connection.getOutputStream()){ - - IOTools.flow(sis, sos); + byte[] buffer = new byte[8192]; + int read; + while ((read = sis.read(buffer)) >= 0) { + sos.write(buffer, 0, read); + sos.flush(); + } } catch (IOException ioe) { throw new IllegalStateException(ioe); } @@ -274,7 +277,7 @@ public class TestUpgrade extends TomcatB private class EchoReadListener extends NoOpReadListener { - private byte[] buffer = new byte[8096]; + private byte[] buffer = new byte[8192]; @Override public void onDataAvailable() { @@ -288,6 +291,7 @@ public class TestUpgrade extends TomcatB throw new IOException("Unable to echo data. " + "isReady() returned false"); } + sos.flush(); } } } catch (IOException ioe) { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org