Author: markt Date: Fri Aug 17 19:16:56 2018 New Revision: 1838275 URL: http://svn.apache.org/viewvc?rev=1838275&view=rev Log: Additional fixes for output corruption of response bodies when writing large bodies using asynchronous processing over HTTP/2.
Added: tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java (with props) Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/Stream.java tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java?rev=1838275&r1=1838274&r2=1838275&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java Fri Aug 17 19:16:56 2018 @@ -280,7 +280,7 @@ public class Http2AsyncUpgradeHandler ex // Reserve as much as possible right away int reservation = (sendfile.end - sendfile.pos > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) (sendfile.end - sendfile.pos); sendfile.streamReservation = sendfile.stream.reserveWindowSize(reservation, true); - sendfile.connectionReservation = reserveWindowSize(sendfile.stream, sendfile.streamReservation); + sendfile.connectionReservation = reserveWindowSize(sendfile.stream, sendfile.streamReservation, true); } catch (IOException e) { return SendfileState.ERROR; } @@ -340,7 +340,7 @@ public class Http2AsyncUpgradeHandler ex int reservation = (sendfile.end - sendfile.pos > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) (sendfile.end - sendfile.pos); sendfile.streamReservation = sendfile.stream.reserveWindowSize(reservation, true); } - sendfile.connectionReservation = reserveWindowSize(sendfile.stream, sendfile.streamReservation); + sendfile.connectionReservation = reserveWindowSize(sendfile.stream, sendfile.streamReservation, true); } } catch (IOException e) { failed (e, sendfile); Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1838275&r1=1838274&r2=1838275&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Fri Aug 17 19:16:56 2018 @@ -723,7 +723,7 @@ class Http2UpgradeHandler extends Abstra } - int reserveWindowSize(Stream stream, int reservation) throws IOException { + int reserveWindowSize(Stream stream, int reservation, boolean block) throws IOException { // Need to be holding the stream lock so releaseBacklog() can't notify // this thread until after this thread enters wait() int allocation = 0; @@ -775,12 +775,16 @@ class Http2UpgradeHandler extends Abstra } } if (allocation == 0) { - try { - stream.wait(); - } catch (InterruptedException e) { - throw new IOException(sm.getString( - "upgradeHandler.windowSizeReservationInterrupted", connectionId, - stream.getIdentifier(), Integer.toString(reservation)), e); + if (block) { + try { + stream.wait(); + } catch (InterruptedException e) { + throw new IOException(sm.getString( + "upgradeHandler.windowSizeReservationInterrupted", connectionId, + stream.getIdentifier(), Integer.toString(reservation)), e); + } + } else { + return 0; } } } while (allocation == 0); Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1838275&r1=1838274&r2=1838275&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Fri Aug 17 19:16:56 2018 @@ -718,6 +718,7 @@ class Stream extends AbstractStream impl private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024); private volatile long written = 0; + private volatile int streamReservation = 0; private volatile boolean closed = false; private volatile boolean endOfStreamSent = false; @@ -732,25 +733,31 @@ class Stream extends AbstractStream impl throw new IllegalStateException( sm.getString("stream.closed", getConnectionId(), getIdentifier())); } - int chunkLimit = chunk.limit(); - int offset = 0; - while (chunk.remaining() > 0) { - int thisTime = Math.min(buffer.remaining(), chunk.remaining()); - chunk.limit(chunk.position() + thisTime); - buffer.put(chunk); - chunk.limit(chunkLimit); - offset += thisTime; - if (chunk.remaining() > 0 && !buffer.hasRemaining()) { - // Only flush if we have more data to write and the buffer - // is full - if (flush(true, coyoteResponse.getWriteListener() == null)) { - writeBuffer.add(chunk); - break; + int totalThisTime = 0; + if (writeBuffer.isEmpty()) { + int chunkLimit = chunk.limit(); + while (chunk.remaining() > 0) { + int thisTime = Math.min(buffer.remaining(), chunk.remaining()); + chunk.limit(chunk.position() + thisTime); + buffer.put(chunk); + chunk.limit(chunkLimit); + totalThisTime += thisTime; + if (chunk.remaining() > 0 && !buffer.hasRemaining()) { + // Only flush if we have more data to write and the buffer + // is full + if (flush(true, coyoteResponse.getWriteListener() == null)) { + totalThisTime += chunk.remaining(); + writeBuffer.add(chunk); + break; + } } } + } else { + totalThisTime = chunk.remaining(); + writeBuffer.add(chunk); } - written += offset; - return offset; + written += totalThisTime; + return totalThisTime; } final synchronized boolean flush(boolean block) throws IOException { @@ -803,15 +810,22 @@ class Stream extends AbstractStream impl buffer.flip(); int left = buffer.remaining(); while (left > 0) { - int streamReservation = reserveWindowSize(left, block); if (streamReservation == 0) { - // Must be non-blocking - buffer.compact(); - return true; + streamReservation = reserveWindowSize(left, block); + if (streamReservation == 0) { + // Must be non-blocking + buffer.compact(); + return true; + } } while (streamReservation > 0) { int connectionReservation = - handler.reserveWindowSize(Stream.this, streamReservation); + handler.reserveWindowSize(Stream.this, streamReservation, block); + if (connectionReservation == 0) { + // Must be non-blocking + buffer.compact(); + return true; + } // Do the write handler.writeBody(Stream.this, buffer, connectionReservation, !writeInProgress && closed && left == connectionReservation && @@ -858,16 +872,14 @@ class Stream extends AbstractStream impl } @Override - public boolean writeFromBuffer(ByteBuffer src, boolean blocking) throws IOException { + public synchronized boolean writeFromBuffer(ByteBuffer src, boolean blocking) throws IOException { int chunkLimit = src.limit(); - int offset = 0; while (src.remaining() > 0) { int thisTime = Math.min(buffer.remaining(), src.remaining()); src.limit(src.position() + thisTime); buffer.put(src); src.limit(chunkLimit); - written += offset; - if (flush(true, blocking)) { + if (flush(false, blocking)) { return true; } } Modified: tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java?rev=1838275&r1=1838274&r2=1838275&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java (original) +++ tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java Fri Aug 17 19:16:56 2018 @@ -890,6 +890,7 @@ public abstract class Http2TestBase exte private ConnectionSettingsRemote remoteSettings = new ConnectionSettingsRemote("-1"); private boolean traceBody = false; private ByteBuffer bodyBuffer = null; + private long bytesRead; public void setTraceBody(boolean traceBody) { this.traceBody = traceBody; @@ -905,6 +906,7 @@ public abstract class Http2TestBase exte @Override public ByteBuffer startRequestBodyFrame(int streamId, int payloadSize) { lastStreamId = Integer.toString(streamId); + bytesRead += payloadSize; if (traceBody) { bodyBuffer = ByteBuffer.allocate(payloadSize); return bodyBuffer; @@ -1068,6 +1070,7 @@ public abstract class Http2TestBase exte public void clearTrace() { trace = new StringBuffer(); + bytesRead = 0; } @@ -1079,6 +1082,11 @@ public abstract class Http2TestBase exte public int getMaxFrameSize() { return remoteSettings.getMaxFrameSize(); } + + + public long getBytesRead() { + return bytesRead; + } } @@ -1101,6 +1109,8 @@ public abstract class Http2TestBase exte private static final long serialVersionUID = 1L; + public static final int CONTENT_LENGTH = 8192; + @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { Added: tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java?rev=1838275&view=auto ============================================================================== --- tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java (added) +++ tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java Fri Aug 17 19:16:56 2018 @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http2; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import javax.servlet.AsyncContext; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.catalina.Context; +import org.apache.catalina.Wrapper; +import org.apache.catalina.startup.Tomcat; + +public class TestAsync extends Http2TestBase { + + private static final int BLOCK_SIZE = 0x8000; + + // https://bz.apache.org/bugzilla/show_bug.cgi?id=62614 + @Test + public void testEmptyBothWindowsUpdateConnectionFirst() throws Exception { + doEmptyWindowTest(true, false, false); + } + + + @Test + public void testEmptyBothWindowsUpdateStreamFirst() throws Exception { + doEmptyWindowTest(false, false, false); + } + + + @Test + public void testEmptyConnectionWindowUpdateConnectionFirst() throws Exception { + doEmptyWindowTest(true, false, true); + } + + + @Test + public void testEmptyConnectionWindowUpdateStreamFirst() throws Exception { + doEmptyWindowTest(false, false, true); + } + + + @Test + public void testEmptyStreamWindowUpdateConnectionFirst() throws Exception { + doEmptyWindowTest(true, true, false); + } + + + @Test + public void testEmptyStreamWindowUpdateStreamFirst() throws Exception { + doEmptyWindowTest(false, true, false); + } + + + // No point testing when both Stream and Connection are unlimited + + + private void doEmptyWindowTest(boolean expandConnectionFirst, boolean connectionUnlimited, + boolean streamUnlimited) throws Exception { + int blockCount = 4; + + enableHttp2(); + + Tomcat tomcat = getTomcatInstance(); + + Context ctxt = tomcat.addContext("", null); + Tomcat.addServlet(ctxt, "simple", new SimpleServlet()); + ctxt.addServletMappingDecoded("/simple", "simple"); + Wrapper w = Tomcat.addServlet(ctxt, "async", new AsyncServlet(blockCount)); + w.setAsyncSupported(true); + ctxt.addServletMappingDecoded("/async", "async"); + tomcat.start(); + + openClientConnection(); + doHttpUpgrade(); + sendClientPreface(); + validateHttp2InitialResponse(); + + byte[] frameHeader = new byte[9]; + ByteBuffer headersPayload = ByteBuffer.allocate(128); + buildGetRequest(frameHeader, headersPayload, null, 3, "/async"); + writeFrame(frameHeader, headersPayload); + + // Reset connection window size after intial response + sendWindowUpdate(0, SimpleServlet.CONTENT_LENGTH); + + if (connectionUnlimited) { + // Effectively unlimited for this test + sendWindowUpdate(0, blockCount * BLOCK_SIZE * 2); + } + if (streamUnlimited) { + // Effectively unlimited for this test + sendWindowUpdate(3, blockCount * BLOCK_SIZE * 2); + } + + // Headers + parser.readFrame(true); + // Body + int startingWindowSize = ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE; + + while (output.getBytesRead() < startingWindowSize) { + parser.readFrame(true); + } + + // Check that the right number of bytes were received + Assert.assertEquals(startingWindowSize, output.getBytesRead()); + + // Increase the Window size (50% of total body) + int windowSizeIncrease = blockCount * BLOCK_SIZE / 2; + if (expandConnectionFirst) { + sendWindowUpdate(0, windowSizeIncrease); + sendWindowUpdate(3, windowSizeIncrease); + } else { + sendWindowUpdate(3, windowSizeIncrease); + sendWindowUpdate(0, windowSizeIncrease); + } + + while (output.getBytesRead() < startingWindowSize + windowSizeIncrease) { + parser.readFrame(true); + } + + // Check that the right number of bytes were received + Assert.assertEquals(startingWindowSize + windowSizeIncrease, output.getBytesRead()); + + // Increase the Window size + if (expandConnectionFirst) { + sendWindowUpdate(0, windowSizeIncrease); + sendWindowUpdate(3, windowSizeIncrease); + } else { + sendWindowUpdate(3, windowSizeIncrease); + sendWindowUpdate(0, windowSizeIncrease); + } + + while (!output.getTrace().endsWith("3-EndOfStream\n")) { + parser.readFrame(true); + } + + // Check that the right number of bytes were received + Assert.assertEquals(blockCount * BLOCK_SIZE, output.getBytesRead()); + } + + + public static class AsyncServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + + private final int blockLimit; + + public AsyncServlet(int blockLimit) { + this.blockLimit = blockLimit; + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException { + + final AsyncContext asyncContext = request.startAsync(); + + response.setStatus(HttpServletResponse.SC_OK); + response.setContentType("application/binary"); + + final ServletOutputStream output = response.getOutputStream(); + output.setWriteListener(new WriteListener() { + + int blockCount; + byte[] bytes = new byte[BLOCK_SIZE]; + + + @Override + public void onWritePossible() throws IOException { + while (output.isReady()) { + blockCount++; + output.write(bytes); + if (blockCount > blockLimit) { + asyncContext.complete(); + return; + } + } + } + + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }); + } + } +} Propchange: tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1838275&r1=1838274&r2=1838275&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 17 19:16:56 2018 @@ -55,6 +55,10 @@ <bug>62620</bug>: Fix corruption of response bodies when writing large bodies using asynchronous processing over HTTP/2. (markt) </fix> + <fix> + Additional fixes for output corruption of response bodies when writing + large bodies using asynchronous processing over HTTP/2. (markt) + </fix> </changelog> </subsection> <subsection name="Other"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org