Author: markt
Date: Fri Aug 17 19:16:56 2018
New Revision: 1838275

URL: http://svn.apache.org/viewvc?rev=1838275&view=rev
Log:
Additional fixes for output corruption of response bodies when writing large 
bodies using asynchronous processing over HTTP/2.

Added:
    tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java   (with props)
Modified:
    tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
    tomcat/trunk/java/org/apache/coyote/http2/Stream.java
    tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: 
tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java Fri 
Aug 17 19:16:56 2018
@@ -280,7 +280,7 @@ public class Http2AsyncUpgradeHandler ex
                 // Reserve as much as possible right away
                 int reservation = (sendfile.end - sendfile.pos > 
Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) (sendfile.end - sendfile.pos);
                 sendfile.streamReservation  = 
sendfile.stream.reserveWindowSize(reservation, true);
-                sendfile.connectionReservation = 
reserveWindowSize(sendfile.stream, sendfile.streamReservation);
+                sendfile.connectionReservation = 
reserveWindowSize(sendfile.stream, sendfile.streamReservation, true);
             } catch (IOException e) {
                 return SendfileState.ERROR;
             }
@@ -340,7 +340,7 @@ public class Http2AsyncUpgradeHandler ex
                         int reservation = (sendfile.end - sendfile.pos > 
Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) (sendfile.end - sendfile.pos);
                         sendfile.streamReservation = 
sendfile.stream.reserveWindowSize(reservation, true);
                     }
-                    sendfile.connectionReservation = 
reserveWindowSize(sendfile.stream, sendfile.streamReservation);
+                    sendfile.connectionReservation = 
reserveWindowSize(sendfile.stream, sendfile.streamReservation, true);
                 }
             } catch (IOException e) {
                 failed (e, sendfile);

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Fri Aug 
17 19:16:56 2018
@@ -723,7 +723,7 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    int reserveWindowSize(Stream stream, int reservation) throws IOException {
+    int reserveWindowSize(Stream stream, int reservation, boolean block) 
throws IOException {
         // Need to be holding the stream lock so releaseBacklog() can't notify
         // this thread until after this thread enters wait()
         int allocation = 0;
@@ -775,12 +775,16 @@ class Http2UpgradeHandler extends Abstra
                     }
                 }
                 if (allocation == 0) {
-                    try {
-                        stream.wait();
-                    } catch (InterruptedException e) {
-                        throw new IOException(sm.getString(
-                                
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
-                                stream.getIdentifier(), 
Integer.toString(reservation)), e);
+                    if (block) {
+                        try {
+                            stream.wait();
+                        } catch (InterruptedException e) {
+                            throw new IOException(sm.getString(
+                                    
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
+                                    stream.getIdentifier(), 
Integer.toString(reservation)), e);
+                        }
+                    } else {
+                        return 0;
                     }
                 }
             } while (allocation == 0);

Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Fri Aug 17 19:16:56 
2018
@@ -718,6 +718,7 @@ class Stream extends AbstractStream impl
         private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
         private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024);
         private volatile long written = 0;
+        private volatile int streamReservation = 0;
         private volatile boolean closed = false;
         private volatile boolean endOfStreamSent = false;
 
@@ -732,25 +733,31 @@ class Stream extends AbstractStream impl
                 throw new IllegalStateException(
                         sm.getString("stream.closed", getConnectionId(), 
getIdentifier()));
             }
-            int chunkLimit = chunk.limit();
-            int offset = 0;
-            while (chunk.remaining() > 0) {
-                int thisTime = Math.min(buffer.remaining(), chunk.remaining());
-                chunk.limit(chunk.position() + thisTime);
-                buffer.put(chunk);
-                chunk.limit(chunkLimit);
-                offset += thisTime;
-                if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
-                    // Only flush if we have more data to write and the buffer
-                    // is full
-                    if (flush(true, coyoteResponse.getWriteListener() == 
null)) {
-                        writeBuffer.add(chunk);
-                        break;
+            int totalThisTime = 0;
+            if (writeBuffer.isEmpty()) {
+                int chunkLimit = chunk.limit();
+                while (chunk.remaining() > 0) {
+                    int thisTime = Math.min(buffer.remaining(), 
chunk.remaining());
+                    chunk.limit(chunk.position() + thisTime);
+                    buffer.put(chunk);
+                    chunk.limit(chunkLimit);
+                    totalThisTime += thisTime;
+                    if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
+                        // Only flush if we have more data to write and the 
buffer
+                        // is full
+                        if (flush(true, coyoteResponse.getWriteListener() == 
null)) {
+                            totalThisTime += chunk.remaining();
+                            writeBuffer.add(chunk);
+                            break;
+                        }
                     }
                 }
+            } else {
+                totalThisTime = chunk.remaining();
+                writeBuffer.add(chunk);
             }
-            written += offset;
-            return offset;
+            written += totalThisTime;
+            return totalThisTime;
         }
 
         final synchronized boolean flush(boolean block) throws IOException {
@@ -803,15 +810,22 @@ class Stream extends AbstractStream impl
             buffer.flip();
             int left = buffer.remaining();
             while (left > 0) {
-                int streamReservation  = reserveWindowSize(left, block);
                 if (streamReservation == 0) {
-                    // Must be non-blocking
-                    buffer.compact();
-                    return true;
+                    streamReservation  = reserveWindowSize(left, block);
+                    if (streamReservation == 0) {
+                        // Must be non-blocking
+                        buffer.compact();
+                        return true;
+                    }
                 }
                 while (streamReservation > 0) {
                     int connectionReservation =
-                                handler.reserveWindowSize(Stream.this, 
streamReservation);
+                                handler.reserveWindowSize(Stream.this, 
streamReservation, block);
+                    if (connectionReservation == 0) {
+                        // Must be non-blocking
+                        buffer.compact();
+                        return true;
+                    }
                     // Do the write
                     handler.writeBody(Stream.this, buffer, 
connectionReservation,
                             !writeInProgress && closed && left == 
connectionReservation &&
@@ -858,16 +872,14 @@ class Stream extends AbstractStream impl
         }
 
         @Override
-        public boolean writeFromBuffer(ByteBuffer src, boolean blocking) 
throws IOException {
+        public synchronized boolean writeFromBuffer(ByteBuffer src, boolean 
blocking) throws IOException {
             int chunkLimit = src.limit();
-            int offset = 0;
             while (src.remaining() > 0) {
                 int thisTime = Math.min(buffer.remaining(), src.remaining());
                 src.limit(src.position() + thisTime);
                 buffer.put(src);
                 src.limit(chunkLimit);
-                written += offset;
-                if (flush(true, blocking)) {
+                if (flush(false, blocking)) {
                     return true;
                 }
             }

Modified: tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java (original)
+++ tomcat/trunk/test/org/apache/coyote/http2/Http2TestBase.java Fri Aug 17 
19:16:56 2018
@@ -890,6 +890,7 @@ public abstract class Http2TestBase exte
         private ConnectionSettingsRemote remoteSettings = new 
ConnectionSettingsRemote("-1");
         private boolean traceBody = false;
         private ByteBuffer bodyBuffer = null;
+        private long bytesRead;
 
         public void setTraceBody(boolean traceBody) {
             this.traceBody = traceBody;
@@ -905,6 +906,7 @@ public abstract class Http2TestBase exte
         @Override
         public ByteBuffer startRequestBodyFrame(int streamId, int payloadSize) 
{
             lastStreamId = Integer.toString(streamId);
+            bytesRead += payloadSize;
             if (traceBody) {
                 bodyBuffer = ByteBuffer.allocate(payloadSize);
                 return bodyBuffer;
@@ -1068,6 +1070,7 @@ public abstract class Http2TestBase exte
 
         public void clearTrace() {
             trace = new StringBuffer();
+            bytesRead = 0;
         }
 
 
@@ -1079,6 +1082,11 @@ public abstract class Http2TestBase exte
         public int getMaxFrameSize() {
             return remoteSettings.getMaxFrameSize();
         }
+
+
+        public long getBytesRead() {
+            return bytesRead;
+        }
     }
 
 
@@ -1101,6 +1109,8 @@ public abstract class Http2TestBase exte
 
         private static final long serialVersionUID = 1L;
 
+        public static final int CONTENT_LENGTH = 8192;
+
         @Override
         protected void doGet(HttpServletRequest req, HttpServletResponse resp)
                 throws ServletException, IOException {

Added: tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java?rev=1838275&view=auto
==============================================================================
--- tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java (added)
+++ tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java Fri Aug 17 
19:16:56 2018
@@ -0,0 +1,211 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.Context;
+import org.apache.catalina.Wrapper;
+import org.apache.catalina.startup.Tomcat;
+
+public class TestAsync extends Http2TestBase {
+
+    private static final int BLOCK_SIZE = 0x8000;
+
+    // https://bz.apache.org/bugzilla/show_bug.cgi?id=62614
+    @Test
+    public void testEmptyBothWindowsUpdateConnectionFirst() throws Exception {
+        doEmptyWindowTest(true, false, false);
+    }
+
+
+    @Test
+    public void testEmptyBothWindowsUpdateStreamFirst() throws Exception {
+        doEmptyWindowTest(false, false, false);
+    }
+
+
+    @Test
+    public void testEmptyConnectionWindowUpdateConnectionFirst() throws 
Exception {
+        doEmptyWindowTest(true, false, true);
+    }
+
+
+    @Test
+    public void testEmptyConnectionWindowUpdateStreamFirst() throws Exception {
+        doEmptyWindowTest(false, false, true);
+    }
+
+
+    @Test
+    public void testEmptyStreamWindowUpdateConnectionFirst() throws Exception {
+        doEmptyWindowTest(true, true, false);
+    }
+
+
+    @Test
+    public void testEmptyStreamWindowUpdateStreamFirst() throws Exception {
+        doEmptyWindowTest(false, true, false);
+    }
+
+
+    // No point testing when both Stream and Connection are unlimited
+
+
+    private void doEmptyWindowTest(boolean expandConnectionFirst, boolean 
connectionUnlimited,
+            boolean streamUnlimited) throws Exception {
+        int blockCount = 4;
+
+        enableHttp2();
+
+        Tomcat tomcat = getTomcatInstance();
+
+        Context ctxt = tomcat.addContext("", null);
+        Tomcat.addServlet(ctxt, "simple", new SimpleServlet());
+        ctxt.addServletMappingDecoded("/simple", "simple");
+        Wrapper w = Tomcat.addServlet(ctxt, "async", new 
AsyncServlet(blockCount));
+        w.setAsyncSupported(true);
+        ctxt.addServletMappingDecoded("/async", "async");
+        tomcat.start();
+
+        openClientConnection();
+        doHttpUpgrade();
+        sendClientPreface();
+        validateHttp2InitialResponse();
+
+        byte[] frameHeader = new byte[9];
+        ByteBuffer headersPayload = ByteBuffer.allocate(128);
+        buildGetRequest(frameHeader, headersPayload, null, 3, "/async");
+        writeFrame(frameHeader, headersPayload);
+
+        // Reset connection window size after intial response
+        sendWindowUpdate(0, SimpleServlet.CONTENT_LENGTH);
+
+        if (connectionUnlimited) {
+            // Effectively unlimited for this test
+            sendWindowUpdate(0, blockCount * BLOCK_SIZE * 2);
+        }
+        if (streamUnlimited) {
+            // Effectively unlimited for this test
+            sendWindowUpdate(3, blockCount * BLOCK_SIZE * 2);
+        }
+
+        // Headers
+        parser.readFrame(true);
+        // Body
+        int startingWindowSize = 
ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
+
+        while (output.getBytesRead() < startingWindowSize) {
+            parser.readFrame(true);
+        }
+
+        // Check that the right number of bytes were received
+        Assert.assertEquals(startingWindowSize, output.getBytesRead());
+
+        // Increase the Window size (50% of total body)
+        int windowSizeIncrease = blockCount * BLOCK_SIZE / 2;
+        if (expandConnectionFirst) {
+            sendWindowUpdate(0, windowSizeIncrease);
+            sendWindowUpdate(3, windowSizeIncrease);
+        } else {
+            sendWindowUpdate(3, windowSizeIncrease);
+            sendWindowUpdate(0, windowSizeIncrease);
+        }
+
+        while (output.getBytesRead() < startingWindowSize + 
windowSizeIncrease) {
+            parser.readFrame(true);
+        }
+
+        // Check that the right number of bytes were received
+        Assert.assertEquals(startingWindowSize + windowSizeIncrease, 
output.getBytesRead());
+
+        // Increase the Window size
+        if (expandConnectionFirst) {
+            sendWindowUpdate(0, windowSizeIncrease);
+            sendWindowUpdate(3, windowSizeIncrease);
+        } else {
+            sendWindowUpdate(3, windowSizeIncrease);
+            sendWindowUpdate(0, windowSizeIncrease);
+        }
+
+        while (!output.getTrace().endsWith("3-EndOfStream\n")) {
+            parser.readFrame(true);
+        }
+
+        // Check that the right number of bytes were received
+        Assert.assertEquals(blockCount * BLOCK_SIZE, output.getBytesRead());
+    }
+
+
+    public static class AsyncServlet extends HttpServlet {
+
+        private static final long serialVersionUID = 1L;
+
+        private final int blockLimit;
+
+        public AsyncServlet(int blockLimit) {
+            this.blockLimit = blockLimit;
+        }
+
+        @Override
+        protected void doGet(HttpServletRequest request, HttpServletResponse 
response)
+                throws IOException {
+
+            final AsyncContext asyncContext = request.startAsync();
+
+            response.setStatus(HttpServletResponse.SC_OK);
+            response.setContentType("application/binary");
+
+            final ServletOutputStream output = response.getOutputStream();
+            output.setWriteListener(new WriteListener() {
+
+                int blockCount;
+                byte[] bytes = new byte[BLOCK_SIZE];
+
+
+                @Override
+                public void onWritePossible() throws IOException {
+                    while (output.isReady()) {
+                        blockCount++;
+                        output.write(bytes);
+                        if (blockCount > blockLimit) {
+                            asyncContext.complete();
+                            return;
+                        }
+                    }
+                }
+
+
+                @Override
+                public void onError(Throwable t) {
+                    t.printStackTrace();
+                }
+            });
+        }
+    }
+}

Propchange: tomcat/trunk/test/org/apache/coyote/http2/TestAsync.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1838275&r1=1838274&r2=1838275&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 17 19:16:56 2018
@@ -55,6 +55,10 @@
         <bug>62620</bug>: Fix corruption of response bodies when writing large
         bodies using asynchronous processing over HTTP/2. (markt)
       </fix>
+      <fix>
+        Additional fixes for output corruption of response bodies when writing
+        large bodies using asynchronous processing over HTTP/2. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Other">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to