Author: remm Date: Thu Mar 9 15:27:10 2017 New Revision: 1786186 URL: http://svn.apache.org/viewvc?rev=1786186&view=rev Log: - Pick up an old experiment with IO and NIO2. I decided to revisit it as the topic of TomcatCon came along. - Start using it for HTTP/2 output. It seems to work (h2load, testsuite, browser). - After testing it with h2load, it does provide a benefit over vanilla NIO2 with only a limited amount of code, so contribute it. - Add some plumbing to allow extending HTTP/2 upgrade handler to replace its IO, and a minor change to the API to pass the socket wrapper to determine the capabilities of the endpoint. Since it seems to work and NIO2 isn't the default, enable it with NIO2. Although I left it for now, it may be better to disable it with SSL (another reason to pass around the socket wrapper). - Ultimately, the strategy is the exact opposite of Coyote (= zero GC), the protocol handler gets control of everything through the read/write API (syncing, blocking, etc) and encourages GC abuse to minimize blocking. - If this is too big a change, I can revert it and discuss it first. Although it got discussed in 2015, it's a long time ago.
Added: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java (with props) Modified: tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java?rev=1786186&r1=1786185&r2=1786186&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java Thu Mar 9 15:27:10 2017 @@ -70,13 +70,14 @@ public interface UpgradeProtocol { /** + * @param socketWrapper The socket * @param adapter The Adapter to use to configure the new upgrade handler * @param request A copy (may be incomplete) of the request that triggered * the upgrade * * @return An instance of the HTTP upgrade handler for this protocol */ - public InternalHttpUpgradeHandler getInternalUpgradeHandler(Adapter adapter, Request request); + public InternalHttpUpgradeHandler getInternalUpgradeHandler(SocketWrapperBase<?> socketWrapper, Adapter adapter, Request request); /** Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1786186&r1=1786185&r2=1786186&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Thu Mar 9 15:27:10 2017 @@ -455,7 +455,7 @@ public class Http11Processor extends Abs InternalHttpUpgradeHandler upgradeHandler = upgradeProtocol.getInternalUpgradeHandler( - getAdapter(), cloneRequest(request)); + socketWrapper, getAdapter(), cloneRequest(request)); UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null); action(ActionCode.UPGRADE, upgradeToken); return SocketState.UPGRADING; Added: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java?rev=1786186&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java (added) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java Thu Mar 9 15:27:10 2017 @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http2; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +import org.apache.coyote.Adapter; +import org.apache.coyote.ProtocolException; +import org.apache.coyote.Request; +import org.apache.coyote.Response; +import org.apache.coyote.http2.HpackEncoder.State; +import org.apache.tomcat.util.net.SocketWrapperBase; +import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode; + +public class Http2AsyncUpgradeHandler extends Http2UpgradeHandler { + + private static final ByteBuffer[] BYTEBUFFER_ARRAY = new ByteBuffer[0]; + private Throwable error = null; + private IOException applicationIOE = null; + + public Http2AsyncUpgradeHandler(Adapter adapter, Request coyoteRequest) { + super (adapter, coyoteRequest); + } + + private CompletionHandler<Long, Void> errorCompletion = new CompletionHandler<Long, Void>() { + @Override + public void completed(Long result, Void attachment) { + } + @Override + public void failed(Throwable t, Void attachment) { + error = t; + } + }; + private CompletionHandler<Long, Void> applicationErrorCompletion = new CompletionHandler<Long, Void>() { + @Override + public void completed(Long result, Void attachment) { + } + @Override + public void failed(Throwable t, Void attachment) { + if (t instanceof IOException) { + applicationIOE = (IOException) t; + } + error = t; + } + }; + + @Override + protected PingManager getPingManager() { + return new AsyncPingManager(); + } + + @Override + protected void writeSettings() { + // Send the initial settings frame + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion, + ByteBuffer.wrap(localSettings.getSettingsFrameForPending())); + if (error != null) { + String msg = sm.getString("upgradeHandler.sendPrefaceFail", connectionId); + if (log.isDebugEnabled()) { + log.debug(msg); + } + throw new ProtocolException(msg, error); + } + } + + + @Override + void sendStreamReset(StreamException se) throws IOException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.rst.debug", connectionId, + Integer.toString(se.getStreamId()), se.getError())); + } + // Write a RST frame + byte[] rstFrame = new byte[13]; + // Length + ByteUtil.setThreeBytes(rstFrame, 0, 4); + // Type + rstFrame[3] = FrameType.RST.getIdByte(); + // No flags + // Stream ID + ByteUtil.set31Bits(rstFrame, 5, se.getStreamId()); + // Payload + ByteUtil.setFourBytes(rstFrame, 9, se.getError().getCode()); + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion, + ByteBuffer.wrap(rstFrame)); + handleAsyncException(); + } + + + @Override + protected void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] debugMsg) + throws IOException { + byte[] fixedPayload = new byte[8]; + ByteUtil.set31Bits(fixedPayload, 0, maxStreamId); + ByteUtil.setFourBytes(fixedPayload, 4, errorCode); + int len = 8; + if (debugMsg != null) { + len += debugMsg.length; + } + byte[] payloadLength = new byte[3]; + ByteUtil.setThreeBytes(payloadLength, 0, len); + if (debugMsg != null) { + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion, + ByteBuffer.wrap(payloadLength), ByteBuffer.wrap(GOAWAY), ByteBuffer.wrap(fixedPayload), ByteBuffer.wrap(debugMsg)); + } else { + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion, + ByteBuffer.wrap(payloadLength), ByteBuffer.wrap(GOAWAY), ByteBuffer.wrap(fixedPayload)); + } + handleAsyncException(); + } + + + @Override + void writeHeaders(Stream stream, Response coyoteResponse, int payloadSize) + throws IOException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId, + stream.getIdentifier())); + } + + if (!stream.canWrite()) { + return; + } + + prepareHeaders(coyoteResponse); + + boolean first = true; + State state = null; + ArrayList<ByteBuffer> bufs = new ArrayList<>(); + // This ensures the Stream processing thread has control of the socket. + while (state != State.COMPLETE) { + byte[] header = new byte[9]; + ByteBuffer target = ByteBuffer.allocate(payloadSize); + state = getHpackEncoder().encode(coyoteResponse.getMimeHeaders(), target); + target.flip(); + ByteUtil.setThreeBytes(header, 0, target.limit()); + if (first) { + first = false; + header[3] = FrameType.HEADERS.getIdByte(); + if (stream.getOutputBuffer().hasNoBody()) { + header[4] = FLAG_END_OF_STREAM; + } + } else { + header[3] = FrameType.CONTINUATION.getIdByte(); + } + if (state == State.COMPLETE) { + header[4] += FLAG_END_OF_HEADERS; + } + if (log.isDebugEnabled()) { + log.debug(target.limit() + " bytes"); + } + ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue()); + bufs.add(ByteBuffer.wrap(header)); + bufs.add(target); + } + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion, + bufs.toArray(BYTEBUFFER_ARRAY)); + handleAsyncException(); + } + + + @Override + protected void writePushHeaders(Stream stream, int pushedStreamId, Request coyoteRequest, int payloadSize) + throws IOException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.writePushHeaders", connectionId, + stream.getIdentifier(), Integer.toString(pushedStreamId))); + } + // This ensures the Stream processing thread has control of the socket. + boolean first = true; + State state = null; + ArrayList<ByteBuffer> bufs = new ArrayList<>(); + byte[] pushedStreamIdBytes = new byte[4]; + ByteUtil.set31Bits(pushedStreamIdBytes, 0, pushedStreamId); + while (state != State.COMPLETE) { + byte[] header = new byte[9]; + ByteBuffer target = ByteBuffer.allocate(payloadSize); + target.put(pushedStreamIdBytes); + state = getHpackEncoder().encode(coyoteRequest.getMimeHeaders(), target); + target.flip(); + ByteUtil.setThreeBytes(header, 0, target.limit()); + if (first) { + first = false; + header[3] = FrameType.PUSH_PROMISE.getIdByte(); + } else { + header[3] = FrameType.CONTINUATION.getIdByte(); + } + if (state == State.COMPLETE) { + header[4] += FLAG_END_OF_HEADERS; + } + if (log.isDebugEnabled()) { + log.debug(target.limit() + " bytes"); + } + ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue()); + bufs.add(ByteBuffer.wrap(header)); + bufs.add(target); + } + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion, + bufs.toArray(BYTEBUFFER_ARRAY)); + handleAsyncException(); + } + + + @Override + void writeBody(Stream stream, ByteBuffer data, int len, boolean finished) throws IOException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.writeBody", connectionId, stream.getIdentifier(), + Integer.toString(len))); + } + // Need to check this now since sending end of stream will change this. + boolean writeable = stream.canWrite(); + byte[] header = new byte[9]; + ByteUtil.setThreeBytes(header, 0, len); + header[3] = FrameType.DATA.getIdByte(); + if (finished) { + header[4] = FLAG_END_OF_STREAM; + stream.sentEndOfStream(); + if (!stream.isActive()) { + activeRemoteStreamCount.decrementAndGet(); + } + } + if (writeable) { + ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue()); + int orgLimit = data.limit(); + data.limit(data.position() + len); + socketWrapper.write(BlockingMode.BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion, + ByteBuffer.wrap(header), data); + data.limit(orgLimit); + handleAsyncException(); + } + } + + + @Override + void writeWindowUpdate(Stream stream, int increment, boolean applicationInitiated) + throws IOException { + if (!stream.canWrite()) { + return; + } + // Build window update frame for stream 0 + byte[] frame = new byte[13]; + ByteUtil.setThreeBytes(frame, 0, 4); + frame[3] = FrameType.WINDOW_UPDATE.getIdByte(); + ByteUtil.set31Bits(frame, 9, increment); + // Change stream Id + byte[] frame2 = new byte[13]; + ByteUtil.setThreeBytes(frame2, 0, 4); + frame2[3] = FrameType.WINDOW_UPDATE.getIdByte(); + ByteUtil.set31Bits(frame2, 9, increment); + ByteUtil.set31Bits(frame2, 5, stream.getIdentifier().intValue()); + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion, + ByteBuffer.wrap(frame), ByteBuffer.wrap(frame2)); + handleAsyncException(); + } + + + @Override + public void settingsEnd(boolean ack) throws IOException { + if (ack) { + if (!localSettings.ack()) { + // Ack was unexpected + log.warn(sm.getString("upgradeHandler.unexpectedAck", connectionId, getIdentifier())); + } + } else { + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion, + ByteBuffer.wrap(SETTINGS_ACK)); + } + handleAsyncException(); + } + + + protected void handleAsyncException() throws IOException { + if (applicationIOE != null) { + handleAppInitiatedIOException(applicationIOE); + } else if (error != null) { + throw new IOException(error); + } + } + + + protected class AsyncPingManager extends PingManager { + @Override + public void sendPing(boolean force) throws IOException { + long now = System.nanoTime(); + if (force || now - lastPingNanoTime > pingIntervalNano) { + lastPingNanoTime = now; + byte[] payload = new byte[8]; + int sentSequence = ++sequence; + PingRecord pingRecord = new PingRecord(sentSequence, now); + inflightPings.add(pingRecord); + ByteUtil.set31Bits(payload, 4, sentSequence); + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion, + ByteBuffer.wrap(PING), ByteBuffer.wrap(payload)); + handleAsyncException(); + } + } + + @Override + public void receivePing(byte[] payload, boolean ack) throws IOException { + if (ack) { + super.receivePing(payload, ack); + } else { + // Client originated ping. Echo it back. + socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS, + null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion, + ByteBuffer.wrap(PING_ACK), ByteBuffer.wrap(payload)); + handleAsyncException(); + } + } + + } + +} Propchange: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java?rev=1786186&r1=1786185&r2=1786186&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java Thu Mar 9 15:27:10 2017 @@ -92,15 +92,17 @@ public class Http2Protocol implements Up @Override public Processor getProcessor(SocketWrapperBase<?> socketWrapper, Adapter adapter) { UpgradeProcessorInternal processor = new UpgradeProcessorInternal(socketWrapper, - new UpgradeToken(getInternalUpgradeHandler(adapter, null), null, null)); + new UpgradeToken(getInternalUpgradeHandler(socketWrapper, adapter, null), null, null)); return processor; } @Override - public InternalHttpUpgradeHandler getInternalUpgradeHandler(Adapter adapter, - Request coyoteRequest) { - Http2UpgradeHandler result = new Http2UpgradeHandler(adapter, coyoteRequest); + public InternalHttpUpgradeHandler getInternalUpgradeHandler(SocketWrapperBase<?> socketWrapper, + Adapter adapter, Request coyoteRequest) { + Http2UpgradeHandler result = (socketWrapper.hasAsyncIO()) + ? new Http2AsyncUpgradeHandler(adapter, coyoteRequest) + : new Http2UpgradeHandler(adapter, coyoteRequest); result.setReadTimeout(getReadTimeout()); result.setKeepAliveTimeout(getKeepAliveTimeout()); Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1786186&r1=1786185&r2=1786186&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu Mar 9 15:27:10 2017 @@ -76,36 +76,36 @@ import org.apache.tomcat.util.res.String class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeHandler, Input, Output { - private static final Log log = LogFactory.getLog(Http2UpgradeHandler.class); - private static final StringManager sm = StringManager.getManager(Http2UpgradeHandler.class); + protected static final Log log = LogFactory.getLog(Http2UpgradeHandler.class); + protected static final StringManager sm = StringManager.getManager(Http2UpgradeHandler.class); private static final AtomicInteger connectionIdGenerator = new AtomicInteger(0); private static final Integer STREAM_ID_ZERO = Integer.valueOf(0); - private static final int FLAG_END_OF_STREAM = 1; - private static final int FLAG_END_OF_HEADERS = 4; + protected static final int FLAG_END_OF_STREAM = 1; + protected static final int FLAG_END_OF_HEADERS = 4; - private static final byte[] PING = { 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00}; - private static final byte[] PING_ACK = { 0x00, 0x00, 0x08, 0x06, 0x01, 0x00, 0x00, 0x00, 0x00 }; + protected static final byte[] PING = { 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00}; + protected static final byte[] PING_ACK = { 0x00, 0x00, 0x08, 0x06, 0x01, 0x00, 0x00, 0x00, 0x00 }; - private static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 }; + protected static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 }; - private static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00, 0x00 }; + protected static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00, 0x00 }; - private static final String HTTP2_SETTINGS_HEADER = "HTTP2-Settings"; + protected static final String HTTP2_SETTINGS_HEADER = "HTTP2-Settings"; private static final HeaderSink HEADER_SINK = new HeaderSink(); - private final String connectionId; + protected final String connectionId; - private final Adapter adapter; - private volatile SocketWrapperBase<?> socketWrapper; - private volatile SSLSupport sslSupport; + protected final Adapter adapter; + protected volatile SocketWrapperBase<?> socketWrapper; + protected volatile SSLSupport sslSupport; - private volatile Http2Parser parser; + protected volatile Http2Parser parser; // Simple state machine (sequence of states) - private AtomicReference<ConnectionState> connectionState = + protected AtomicReference<ConnectionState> connectionState = new AtomicReference<>(ConnectionState.NEW); private volatile long pausedNanoTime = Long.MAX_VALUE; @@ -113,12 +113,12 @@ class Http2UpgradeHandler extends Abstra * Remote settings are settings defined by the client and sent to Tomcat * that Tomcat must use when communicating with the client. */ - private final ConnectionSettingsRemote remoteSettings; + protected final ConnectionSettingsRemote remoteSettings; /** * Local settings are settings defined by Tomcat and sent to the client that * the client must use when communicating with Tomcat. */ - private final ConnectionSettingsLocal localSettings; + protected final ConnectionSettingsLocal localSettings; private HpackDecoder hpackDecoder; private HpackEncoder hpackEncoder; @@ -129,22 +129,22 @@ class Http2UpgradeHandler extends Abstra private long writeTimeout = Http2Protocol.DEFAULT_WRITE_TIMEOUT; private final Map<Integer,Stream> streams = new HashMap<>(); - private final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0); + protected final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0); private volatile int maxRemoteStreamId = 0; // Start at -1 so the 'add 2' logic in closeIdleStreams() works private volatile int maxActiveRemoteStreamId = -1; private volatile int maxProcessedStreamId; private final AtomicInteger nextLocalStreamId = new AtomicInteger(2); - private final PingManager pingManager = new PingManager(); + protected final PingManager pingManager = getPingManager(); private volatile int newStreamsSinceLastPrune = 0; // Tracking for when the connection is blocked (windowSize < 1) private final Map<AbstractStream,int[]> backLogStreams = new ConcurrentHashMap<>(); private long backLogSize = 0; // Stream concurrency control - private int maxConcurrentStreamExecution = Http2Protocol.DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION; - private AtomicInteger streamConcurrency = null; - private Queue<StreamRunnable> queuedRunnable = null; + protected int maxConcurrentStreamExecution = Http2Protocol.DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION; + protected AtomicInteger streamConcurrency = null; + protected Queue<StreamRunnable> queuedRunnable = null; // Limits private Set<String> allowedTrailerHeaders = Collections.emptySet(); @@ -177,6 +177,9 @@ class Http2UpgradeHandler extends Abstra } } + protected PingManager getPingManager() { + return new PingManager(); + } @Override public void init(WebConnection webConnection) { @@ -226,17 +229,7 @@ class Http2UpgradeHandler extends Abstra } // Send the initial settings frame - try { - byte[] settings = localSettings.getSettingsFrameForPending(); - socketWrapper.write(true, settings, 0, settings.length); - socketWrapper.flush(true); - } catch (IOException ioe) { - String msg = sm.getString("upgradeHandler.sendPrefaceFail", connectionId); - if (log.isDebugEnabled()) { - log.debug(msg); - } - throw new ProtocolException(msg, ioe); - } + writeSettings(); // Make sure the client has sent a valid connection preface before we // send the response to the original request over HTTP/2. @@ -245,7 +238,7 @@ class Http2UpgradeHandler extends Abstra } catch (Http2Exception e) { String msg = sm.getString("upgradeHandler.invalidPreface", connectionId); if (log.isDebugEnabled()) { - log.debug(msg); + log.debug(msg, e); } throw new ProtocolException(msg); } @@ -265,8 +258,7 @@ class Http2UpgradeHandler extends Abstra } } - - private void processStreamOnContainerThread(Stream stream) { + protected void processStreamOnContainerThread(Stream stream) { StreamProcessor streamProcessor = new StreamProcessor(this, stream, adapter, socketWrapper); streamProcessor.setSslSupport(sslSupport); processStreamOnContainerThread(streamProcessor, SocketEvent.OPEN_READ); @@ -323,7 +315,7 @@ class Http2UpgradeHandler extends Abstra try { // There is data to read so use the read timeout while // reading frames. - socketWrapper.setReadTimeout(getReadTimeout()); + socketWrapper.setReadTimeout(getReadTimeout()); while (true) { try { if (!parser.readFrame(false)) { @@ -493,7 +485,23 @@ class Http2UpgradeHandler extends Abstra } - private void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] debugMsg) + protected void writeSettings() { + // Send the initial settings frame + try { + byte[] settings = localSettings.getSettingsFrameForPending(); + socketWrapper.write(true, settings, 0, settings.length); + socketWrapper.flush(true); + } catch (IOException ioe) { + String msg = sm.getString("upgradeHandler.sendPrefaceFail", connectionId); + if (log.isDebugEnabled()) { + log.debug(msg); + } + throw new ProtocolException(msg, ioe); + } + } + + + protected void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] debugMsg) throws IOException { byte[] fixedPayload = new byte[8]; ByteUtil.set31Bits(fixedPayload, 0, maxStreamId); @@ -567,7 +575,7 @@ class Http2UpgradeHandler extends Abstra } - private void prepareHeaders(Response coyoteResponse) { + protected void prepareHeaders(Response coyoteResponse) { MimeHeaders headers = coyoteResponse.getMimeHeaders(); int statusCode = coyoteResponse.getStatus(); @@ -593,7 +601,7 @@ class Http2UpgradeHandler extends Abstra } - private void writePushHeaders(Stream stream, int pushedStreamId, Request coyoteRequest, int payloadSize) + protected void writePushHeaders(Stream stream, int pushedStreamId, Request coyoteRequest, int payloadSize) throws IOException { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.writePushHeaders", connectionId, @@ -633,7 +641,7 @@ class Http2UpgradeHandler extends Abstra } - private HpackEncoder getHpackEncoder() { + protected HpackEncoder getHpackEncoder() { if (hpackEncoder == null) { hpackEncoder = new HpackEncoder(); } @@ -688,7 +696,7 @@ class Http2UpgradeHandler extends Abstra * Note: We can not rely on this exception reaching the socket processor * since the application code may swallow it. */ - private void handleAppInitiatedIOException(IOException ioe) throws IOException { + protected void handleAppInitiatedIOException(IOException ioe) throws IOException { close(); throw ioe; } @@ -927,7 +935,7 @@ class Http2UpgradeHandler extends Abstra } - private Stream getStream(int streamId, boolean unknownIsError) throws ConnectionException { + protected Stream getStream(int streamId, boolean unknownIsError) throws ConnectionException { Integer key = Integer.valueOf(streamId); Stream result = streams.get(key); if (result == null && unknownIsError) { @@ -1497,16 +1505,16 @@ class Http2UpgradeHandler extends Abstra } - private class PingManager { + protected class PingManager { // 10 seconds - private final long pingIntervalNano = 10000000000L; + protected final long pingIntervalNano = 10000000000L; - private int sequence = 0; - private long lastPingNanoTime = Long.MIN_VALUE; + protected int sequence = 0; + protected long lastPingNanoTime = Long.MIN_VALUE; - private Queue<PingRecord> inflightPings = new ConcurrentLinkedQueue<>(); - private Queue<Long> roundTripTimes = new ConcurrentLinkedQueue<>(); + protected Queue<PingRecord> inflightPings = new ConcurrentLinkedQueue<>(); + protected Queue<Long> roundTripTimes = new ConcurrentLinkedQueue<>(); /** * Check to see if a ping was sent recently and, if not, send one. @@ -1557,6 +1565,7 @@ class Http2UpgradeHandler extends Abstra } else { // Client originated ping. Echo it back. synchronized (socketWrapper) { + // FIXME: extract socketWrapper.write(true, PING_ACK, 0, PING_ACK.length); socketWrapper.write(true, payload, 0, payload.length); socketWrapper.flush(true); @@ -1570,7 +1579,7 @@ class Http2UpgradeHandler extends Abstra } - private static class PingRecord { + protected static class PingRecord { private final int sequence; private final long sentNanoTime; @@ -1590,7 +1599,7 @@ class Http2UpgradeHandler extends Abstra } - private enum ConnectionState { + protected enum ConnectionState { NEW(true), CONNECTED(true), Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1786186&r1=1786185&r2=1786186&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Thu Mar 9 15:27:10 2017 @@ -45,6 +45,13 @@ issues do not "pop up" wrt. others). --> <section name="Tomcat 9.0.0.M19 (markt)" rtext="in development"> + <subsection name="Coyote"> + <changelog> + <fix> + Add async based IO groundwork for HTTP/2. (remm) + </fix> + </changelog> + </subsection> <subsection name="Other"> <changelog> <fix> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org