This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit 3bc67c05d97c172eb7a9def2e54e1e1c6f082a82
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon Jun 3 14:15:59 2019 +0100

    Refactor Stream / Connection flow control window allocation
---
 .../apache/coyote/http2/Http2UpgradeHandler.java   |  78 +++------
 .../apache/coyote/http2/LocalStrings.properties    |  11 +-
 java/org/apache/coyote/http2/Stream.java           |  50 +++---
 .../coyote/http2/WindowAllocationManager.java      | 189 +++++++++++++++++++++
 4 files changed, 243 insertions(+), 85 deletions(-)

diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 8abd92b..cbfbd3b 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -37,12 +37,10 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.servlet.http.WebConnection;
 
-import org.apache.coyote.ActionCode;
 import org.apache.coyote.Adapter;
 import org.apache.coyote.CloseNowException;
 import org.apache.coyote.ProtocolException;
 import org.apache.coyote.Request;
-import org.apache.coyote.Response;
 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
 import org.apache.coyote.http2.HpackDecoder.HeaderEmitter;
 import org.apache.coyote.http2.HpackEncoder.State;
@@ -751,11 +749,10 @@ public class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpU
 
 
     int reserveWindowSize(Stream stream, int reservation, boolean block) 
throws IOException {
-        // Need to be holding the connection allocation lock so 
releaseBacklog()
-        // can't notify this thread until after this thread enters wait()
+        // Need to be holding the stream lock so releaseBacklog() can't notify
+        // this thread until after this thread enters wait()
         int allocation = 0;
-        Object connectionAllocationLock = stream.getConnectionAllocationLock();
-        synchronized (connectionAllocationLock) {
+        synchronized (stream) {
             do {
                 synchronized (this) {
                     if (!stream.canWrite()) {
@@ -808,38 +805,35 @@ public class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpU
                             // request is for a stream, use the connection
                             // timeout
                             long writeTimeout = protocol.getWriteTimeout();
-                            if (writeTimeout < 0) {
-                                connectionAllocationLock.wait();
-                            } else {
-                                connectionAllocationLock.wait(writeTimeout);
-                                // Has this stream been granted an allocation
-                                // Note: If the stream in not in this Map then 
the
-                                //       requested write has been fully 
allocated
-                                BacklogTracker tracker;
-                                // Ensure allocations made in other threads 
are visible
-                                synchronized (this) {
-                                    tracker = backLogStreams.get(stream);
+                            stream.waitForConnectionAllocation(writeTimeout);
+                            // Has this stream been granted an allocation
+                            // Note: If the stream in not in this Map then the
+                            //       requested write has been fully allocated
+                            BacklogTracker tracker;
+                            // Ensure allocations made in other threads are 
visible
+                            synchronized (this) {
+                                tracker = backLogStreams.get(stream);
+                            }
+                            if (tracker != null && 
tracker.getUnusedAllocation() == 0) {
+                                if (log.isDebugEnabled()) {
+                                    
log.debug(sm.getString("upgradeHandler.noAllocation",
+                                            connectionId, 
stream.getIdentifier()));
                                 }
-                                if (tracker != null && 
tracker.getUnusedAllocation() == 0) {
-                                    if (log.isDebugEnabled()) {
-                                        
log.debug(sm.getString("upgradeHandler.noAllocation",
-                                                connectionId, 
stream.getIdentifier()));
-                                    }
-                                    // No allocation
-                                    // Close the connection. Do this first 
since
-                                    // closing the stream will raise an 
exception
-                                    close();
-                                    // Close the stream (in app code so need to
-                                    // signal to app stream is closing)
-                                    stream.doWriteTimeout();
+                                // No allocation
+                                // Close the connection. Do this first since
+                                // closing the stream will raise an exception
+                                close();
+                                // Close the stream (in app code so need to
+                                // signal to app stream is closing)
+                                stream.doWriteTimeout();
                                 }
-                            }
                         } catch (InterruptedException e) {
                             throw new IOException(sm.getString(
                                     
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
                                     stream.getIdentifier(), 
Integer.toString(reservation)), e);
                         }
                     } else {
+                        stream.waitForConnectionAllocationNonBlocking();
                         return 0;
                     }
                 }
@@ -876,29 +870,7 @@ public class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpU
                 if (this == stream) {
                     continue;
                 }
-                Response coyoteResponse = ((Stream) 
stream).getCoyoteResponse();
-                if (coyoteResponse.getWriteListener() == null) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(sm.getString("upgradeHandler.notify",
-                            connectionId, stream.getIdentifier()));
-                    }
-                    // Blocking, so use notify to release StreamOutputBuffer
-                    Object connectionAllocationLock = ((Stream) 
stream).getConnectionAllocationLock();
-                    synchronized (connectionAllocationLock) {
-                        connectionAllocationLock.notify();
-                    }
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug(sm.getString("upgradeHandler.dispatchWrite",
-                            connectionId, stream.getIdentifier()));
-                    }
-                    // Non-blocking so dispatch
-                    coyoteResponse.action(ActionCode.DISPATCH_WRITE, null);
-                    // Need to explicitly execute dispatches on the
-                    // StreamProcessor as this thread is being processed by an
-                    // UpgradeProcessor which won't see this dispatch
-                    coyoteResponse.action(ActionCode.DISPATCH_EXECUTE, null);
-                }
+                ((Stream) stream).notifyConnection();
             }
         }
     }
diff --git a/java/org/apache/coyote/http2/LocalStrings.properties 
b/java/org/apache/coyote/http2/LocalStrings.properties
index dc3bd1c..47d8c3a 100644
--- a/java/org/apache/coyote/http2/LocalStrings.properties
+++ b/java/org/apache/coyote/http2/LocalStrings.properties
@@ -118,7 +118,6 @@ upgradeHandler.allocate.left=Connection [{0}], Stream 
[{1}], [{2}] bytes unalloc
 upgradeHandler.allocate.recipient=Connection [{0}], Stream [{1}], potential 
recipient [{2}] with weight [{3}]
 upgradeHandler.connectionError=Connection error
 upgradeHandler.dependency.invalid=Connection [{0}], Stream [{1}], Streams may 
not depend on themselves
-upgradeHandler.dispatchWrite=Connection [{0}], Stream [{1}], Dispatching to 
container thread for async write
 upgradeHandler.goaway.debug=Connection [{0}], Goaway, Last stream [{1}], Error 
code [{2}], Debug data [{3}]
 upgradeHandler.init=Connection [{0}], State [{1}]
 upgradeHandler.initialWindowSize.invalid=Connection [{0}], Illegal value of 
[{1}] ignored for initial window size
@@ -126,7 +125,6 @@ upgradeHandler.invalidPreface=Connection [{0}], Invalid 
connection preface
 upgradeHandler.ioerror=Connection [{0}]
 upgradeHandler.noAllocation=Connection [{0}], Stream [{1}], Timeout waiting 
for allocation
 upgradeHandler.noNewStreams=Connection [{0}], Stream [{1}], Stream ignored as 
no new streams are permitted on this connection
-upgradeHandler.notify=Connection [{0}], Stream [{1}], notify() called to 
release StreamOutputBuffer
 upgradeHandler.pause.entry=Connection [{0}] Pausing
 upgradeHandler.prefaceReceived=Connection [{0}], Connection preface received 
from client
 upgradeHandler.pingFailed=Connection [{0}] Failed to send ping to client
@@ -156,5 +154,14 @@ upgradeHandler.writeBody=Connection [{0}], Stream [{1}], 
Data length [{2}]
 upgradeHandler.writeHeaders=Connection [{0}], Stream [{1}]
 upgradeHandler.writePushHeaders=Connection [{0}], Stream [{1}], Pushed stream 
[{2}], EndOfStream [{3}]
 
+windowAllocationManager.dispatched=Connection [{0}], Stream [{1}], Dispatched
+windowAllocationManager.notify=Connection [{0}], Stream [{1}], Waiting type 
[{2}], Notify type [{3}]
+windowAllocationManager.notified=Connection [{0}], Stream [{1}], Notified
+windowAllocationManager.waitFor.connection=Connection [{0}], Stream [{1}], 
Waiting for Connection flow control window (blocking) with timeout [{3}]
+windowAllocationManager.waitFor.stream=Connection [{0}], Stream [{1}], Waiting 
for Stream flow control window (blocking) with timeout [{3}]
+windowAllocationManager.waitFor.ise=Connection [{0}], Stream [{1}], Already 
waiting
+windowAllocationManager.waitForNonBlocking.connection=Connection [{0}], Stream 
[{1}], Waiting for Connection flow control window (non-blocking)
+windowAllocationManager.waitForNonBlocking.stram=Connection [{0}], Stream 
[{1}], Waiting for Stream flow control window (non-blocking)
+
 writeStateMachine.endWrite.ise=It is illegal to specify [{0}] for the new 
state once a write has completed
 writeStateMachine.ise=It is illegal to call [{0}()] in state [{1}]
diff --git a/java/org/apache/coyote/http2/Stream.java 
b/java/org/apache/coyote/http2/Stream.java
index 7f783fd..cc76200 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -69,7 +69,7 @@ public class Stream extends AbstractStream implements 
HeaderEmitter {
 
     private final Http2UpgradeHandler handler;
     private final StreamStateMachine state;
-    private final Object connectionAllocationLock = new Object();
+    private final WindowAllocationManager allocationManager = new 
WindowAllocationManager(this);
 
     // State machine would be too much overhead
     private int headerState = HEADER_STATE_START;
@@ -234,14 +234,7 @@ public class Stream extends AbstractStream implements 
HeaderEmitter {
 
 
     final void cancelAllocationRequests() {
-        // Cancel wait on stream allocation request (if any)
-        synchronized (this) {
-            this.notify();
-        }
-        // Cancel wait on connection allocation request (if any)
-        synchronized (connectionAllocationLock) {
-            connectionAllocationLock.notify();
-        }
+        allocationManager.notifyAny();
     }
 
 
@@ -259,17 +252,7 @@ public class Stream extends AbstractStream implements 
HeaderEmitter {
         boolean notify = getWindowSize() < 1;
         super.incrementWindowSize(windowSizeIncrement);
         if (notify && getWindowSize() > 0) {
-            if (coyoteResponse.getWriteListener() == null) {
-                // Blocking, so use notify to release StreamOutputBuffer
-                notify();
-            } else {
-                // Non-blocking so dispatch
-                coyoteResponse.action(ActionCode.DISPATCH_WRITE, null);
-                // Need to explicitly execute dispatches on the StreamProcessor
-                // as this thread is being processed by an UpgradeProcessor
-                // which won't see this dispatch
-                coyoteResponse.action(ActionCode.DISPATCH_EXECUTE, null);
-            }
+            allocationManager.notifyStream();
         }
     }
 
@@ -284,11 +267,7 @@ public class Stream extends AbstractStream implements 
HeaderEmitter {
             if (block) {
                 try {
                     long writeTimeout = 
handler.getProtocol().getStreamWriteTimeout();
-                    if (writeTimeout < 0) {
-                        wait();
-                    } else {
-                        wait(writeTimeout);
-                    }
+                    allocationManager.waitForStream(writeTimeout);
                     windowSize = getWindowSize();
                     if (windowSize == 0) {
                         doWriteTimeout();
@@ -300,6 +279,7 @@ public class Stream extends AbstractStream implements 
HeaderEmitter {
                     throw new IOException(e);
                 }
             } else {
+                allocationManager.waitForStreamNonBlocking();
                 return 0;
             }
         }
@@ -329,6 +309,21 @@ public class Stream extends AbstractStream implements 
HeaderEmitter {
     }
 
 
+    void waitForConnectionAllocation(long timeout) throws InterruptedException 
{
+        allocationManager.waitForConnection(timeout);
+    }
+
+
+    void waitForConnectionAllocationNonBlocking() {
+        allocationManager.waitForConnectionNonBlocking();
+    }
+
+
+    void notifyConnection() {
+        allocationManager.notifyConnection();
+    }
+
+
     @Override
     @Deprecated
     protected synchronized void doNotifyAll() {
@@ -728,11 +723,6 @@ public class Stream extends AbstractStream implements 
HeaderEmitter {
     }
 
 
-    Object getConnectionAllocationLock() {
-        return connectionAllocationLock;
-    }
-
-
     private static void push(final Http2UpgradeHandler handler, final Request 
request,
             final Stream stream) throws IOException {
         if (org.apache.coyote.Constants.IS_SECURITY_ENABLED) {
diff --git a/java/org/apache/coyote/http2/WindowAllocationManager.java 
b/java/org/apache/coyote/http2/WindowAllocationManager.java
new file mode 100644
index 0000000..7626bf3
--- /dev/null
+++ b/java/org/apache/coyote/http2/WindowAllocationManager.java
@@ -0,0 +1,189 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http2;
+
+import org.apache.coyote.ActionCode;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.res.StringManager;
+
+/**
+ * Tracks whether the stream is waiting for an allocation to the stream flow
+ * control window, to the connection flow control window or not waiting for an
+ * allocation and only issues allocation notifications when the stream is known
+ * to be waiting for the notification.
+ *
+ * It is possible for a stream to be waiting for a connection allocation when
+ * a stream allocation is made. Therefore this class tracks the type of
+ * allocation that the stream is waiting for to ensure that notifications are
+ * correctly triggered.
+ *
+ * With the implementation at the time of writing, it is not possible for a
+ * stream to receive an unexpected connection notification as these are only
+ * issues to streams in the backlog and a stream must be waiting for a
+ * connection allocation in order to be placed on the backlog. However, as a
+ * precaution, this class protects against unexpected connection notifications.
+ *
+ * It is important for asynchronous processing not to notify unless a
+ * notification is expected else a dispatch will be performed unnecessarily
+ * which may lead to unexpected results.
+ *
+ * A previous implementation used separate locks for the stream and connection
+ * notifications. However, correct handling of allocation waiting requires
+ * holding the stream lock when making the decision to wait. Therefore both
+ * allocations need to wait on the Stream.
+ */
+class WindowAllocationManager {
+
+    private static final Log log = 
LogFactory.getLog(WindowAllocationManager.class);
+    private static final StringManager sm = 
StringManager.getManager(WindowAllocationManager.class);
+
+    private static final int NONE = 0;
+    private static final int STREAM = 1;
+    private static final int CONNECTION = 2;
+
+    private final Stream stream;
+
+    private int waitingFor = NONE;
+
+    WindowAllocationManager(Stream stream) {
+        this.stream = stream;
+    }
+
+    void waitForStream(long timeout) throws InterruptedException {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("windowAllocationManager.waitFor.stream",
+                    stream.getConnectionId(), stream.getIdentifier(), 
Long.toString(timeout)));
+        }
+
+        waitFor(STREAM, timeout);
+    }
+
+
+    void waitForConnection(long timeout) throws InterruptedException {
+        if (log.isDebugEnabled()) {
+            
log.debug(sm.getString("windowAllocationManager.waitFor.connection",
+                    stream.getConnectionId(), stream.getIdentifier(), 
Long.toString(timeout)));
+        }
+
+        waitFor(CONNECTION, timeout);
+    }
+
+
+    void waitForStreamNonBlocking() {
+        if (log.isDebugEnabled()) {
+            
log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.stream",
+                    stream.getConnectionId(), stream.getIdentifier()));
+        }
+
+        waitForNonBlocking(STREAM);
+    }
+
+
+    void waitForConnectionNonBlocking() {
+        if (log.isDebugEnabled()) {
+            
log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.connection",
+                    stream.getConnectionId(), stream.getIdentifier()));
+        }
+
+        waitForNonBlocking(CONNECTION);
+    }
+
+
+    void notifyStream() {
+        notify(STREAM);
+    }
+
+
+    void notifyConnection() {
+        notify(CONNECTION);
+    }
+
+
+    void notifyAny() {
+        notify(STREAM | CONNECTION);
+    }
+
+
+    private void waitFor(int waitTarget, long timeout) throws 
InterruptedException {
+        synchronized (stream) {
+            if (waitingFor != 0) {
+                throw new 
IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
+                        stream.getConnectionId(), stream.getIdentifier()));
+            }
+
+            waitingFor = waitTarget;
+
+            if (timeout < 0) {
+                stream.wait();
+            } else {
+                stream.wait(timeout);
+            }
+
+            waitingFor = 0;
+        }
+    }
+
+
+    private void waitForNonBlocking(int waitTarget) {
+        synchronized (stream) {
+            if (waitingFor == 0) {
+                waitingFor = waitTarget;
+            } else if (waitingFor == waitTarget) {
+                // NO-OP
+                // Non-blocking post-processing may attempt to flush
+            } else {
+                throw new 
IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
+                        stream.getConnectionId(), stream.getIdentifier()));
+            }
+
+        }
+    }
+
+
+    private void notify(int notifyTarget) {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("windowAllocationManager.notify", 
stream.getConnectionId(),
+                    stream.getIdentifier(), Integer.toString(waitingFor), 
Integer.toString(notifyTarget)));
+        }
+
+        synchronized (stream) {
+            if ((notifyTarget & waitingFor) > 0) {
+                if (stream.getCoyoteResponse().getWriteListener() == null) {
+                    // Blocking, so use notify to release StreamOutputBuffer
+                    if (log.isDebugEnabled()) {
+                        
log.debug(sm.getString("windowAllocationManager.notified",
+                                stream.getConnectionId(), 
stream.getIdentifier()));
+                    }
+                    stream.notify();
+                } else {
+                    waitingFor = 0;
+                    // Non-blocking so dispatch
+                    if (log.isDebugEnabled()) {
+                        
log.debug(sm.getString("windowAllocationManager.dispatched",
+                                stream.getConnectionId(), 
stream.getIdentifier()));
+                    }
+                    
stream.getCoyoteResponse().action(ActionCode.DISPATCH_WRITE, null);
+                    // Need to explicitly execute dispatches on the 
StreamProcessor
+                    // as this thread is being processed by an UpgradeProcessor
+                    // which won't see this dispatch
+                    
stream.getCoyoteResponse().action(ActionCode.DISPATCH_EXECUTE, null);
+                }
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to