This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push: new bc714fd Add asynchronous IO API for NIO bc714fd is described below commit bc714fd2e3fca74d8931533770b6ee064b67287f Author: remm <r...@apache.org> AuthorDate: Wed Apr 10 21:37:48 2019 +0200 Add asynchronous IO API for NIO This uses the concepts from the NIO2 implementation. The HTTP/2 and Websockets async IO code will automatically use it, with the same behavior as when using NIO2. As it seems to work just fine so far and with the next build being far off, I am enabling it by default to get some testing. --- .../apache/tomcat/util/net/AbstractEndpoint.java | 8 + java/org/apache/tomcat/util/net/AprEndpoint.java | 4 + java/org/apache/tomcat/util/net/Nio2Endpoint.java | 3 +- java/org/apache/tomcat/util/net/NioEndpoint.java | 493 +++++++++++++++++---- webapps/docs/changelog.xml | 6 + webapps/docs/config/http.xml | 10 + 6 files changed, 443 insertions(+), 81 deletions(-) diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java index 0b8b683..682012c 100644 --- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java +++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java @@ -741,6 +741,14 @@ public abstract class AbstractEndpoint<S,U> { public boolean getDaemon() { return daemon; } + /** + * Expose async IO capability. + */ + private boolean useAsyncIO = true; + public void setUseAsyncIO(boolean useAsyncIO) { this.useAsyncIO = useAsyncIO; } + public boolean getUseAsyncIO() { return useAsyncIO; } + + protected abstract boolean getDeferAccept(); diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index b9f15d0..bfb5c55 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -129,6 +129,10 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB public boolean getDeferAccept() { return deferAccept; } + @Override + public boolean getUseAsyncIO() { return false; } + + private boolean ipv6v6only = false; public void setIpv6v6only(boolean ipv6v6only) { this.ipv6v6only = ipv6v6only; } public boolean getIpv6v6only() { return ipv6v6only; } diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java index fd31ac1..370934d 100644 --- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java +++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java @@ -91,6 +91,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS // ------------------------------------------------------------- Properties + /** * Is deferAccept supported? */ @@ -941,7 +942,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS @Override public boolean hasAsyncIO() { - return true; + return getEndpoint().getUseAsyncIO(); } /** diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 621dd63..74b0b88 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -27,7 +27,9 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.Channel; +import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; +import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.NetworkChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -37,6 +39,7 @@ import java.nio.channels.WritableByteChannel; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -353,7 +356,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> serverSock = null; } - // ------------------------------------------------------ Protected Methods public NioSelectorPool getSelectorPool() { @@ -382,7 +384,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { - //disable blocking, APR style, we are gonna be polling it + // Disable blocking, polling will be used socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); @@ -533,7 +535,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } else { final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); if (socketWrapper != null) { - //we are registering the key to start with, reset the fairness counter. + // We are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; socketWrapper.interestOps(ops); key.interestOps(ops); @@ -566,7 +568,8 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> new SynchronizedQueue<>(); private volatile boolean close = false; - private long nextExpiration = 0;//optimize expiration handling + // Optimize expiration handling + private long nextExpiration = 0; private AtomicLong wakeupCounter = new AtomicLong(0); @@ -578,7 +581,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> public int getKeyCount() { return keyCount; } - public Selector getSelector() { return selector;} + public Selector getSelector() { return selector; } /** * Destroy the poller. @@ -593,7 +596,9 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> private void addEvent(PollerEvent event) { events.offer(event); - if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); + if (wakeupCounter.incrementAndGet() == 0) { + selector.wakeup(); + } } /** @@ -608,12 +613,15 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> */ public void add(final NioChannel socket, final int interestOps) { PollerEvent r = eventCache.pop(); - if ( r==null) r = new PollerEvent(socket,null,interestOps); - else r.reset(socket,null,interestOps); + if (r == null) { + r = new PollerEvent(socket, null, interestOps); + } else { + r.reset(socket, null, interestOps); + } addEvent(r); if (close) { - NioEndpoint.NioSocketWrapper ka = (NioEndpoint.NioSocketWrapper)socket.getAttachment(); - processSocket(ka, SocketEvent.STOP, false); + NioSocketWrapper socketWrapper = (NioSocketWrapper) socket.getAttachment(); + processSocket(socketWrapper, SocketEvent.STOP, false); } } @@ -650,17 +658,20 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> */ public void register(final NioChannel socket) { socket.setPoller(this); - NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); - socket.setSocketWrapper(ka); - ka.setPoller(this); - ka.setReadTimeout(getConnectionTimeout()); - ka.setWriteTimeout(getConnectionTimeout()); - ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); - ka.setSecure(isSSLEnabled()); + NioSocketWrapper socketWrapper = new NioSocketWrapper(socket, NioEndpoint.this); + socket.setSocketWrapper(socketWrapper); + socketWrapper.setPoller(this); + socketWrapper.setReadTimeout(getConnectionTimeout()); + socketWrapper.setWriteTimeout(getConnectionTimeout()); + socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); + socketWrapper.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); - ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. - if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); - else r.reset(socket,ka,OP_REGISTER); + socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. + if (r == null) { + r = new PollerEvent(socket, socketWrapper, OP_REGISTER); + } else { + r.reset(socket, socketWrapper, OP_REGISTER); + } addEvent(r); } @@ -736,8 +747,8 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { - //if we are here, means we have other stuff to do - //do a non blocking select + // If we are here, means we have other stuff to do + // Do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); @@ -759,7 +770,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } - //either we timed out or we woke up, process events first + // Either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = @@ -777,34 +788,38 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> iterator.remove(); processKey(sk, attachment); } - }//while + } - //process timeouts + // Process timeouts timeout(keyCount,hasEvents); - }//while + } getStopLatch().countDown(); } - protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { + protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) { try { - if ( close ) { + if (close) { cancelledKey(sk); - } else if ( sk.isValid() && attachment != null ) { - if (sk.isReadable() || sk.isWritable() ) { - if ( attachment.getSendfileData() != null ) { - processSendfile(sk,attachment, false); + } else if (sk.isValid() && socketWrapper != null) { + if (sk.isReadable() || sk.isWritable()) { + if ( socketWrapper.getSendfileData() != null ) { + processSendfile(sk, socketWrapper, false); } else { - unreg(sk, attachment, sk.readyOps()); + unreg(sk, socketWrapper, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { - if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { + if (socketWrapper.readOperation != null) { + getExecutor().execute(socketWrapper.readOperation); + } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { - if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { + if (socketWrapper.writeOperation != null) { + getExecutor().execute(socketWrapper.writeOperation); + } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } @@ -814,10 +829,10 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } } } else { - //invalid key + // Invalid key cancelledKey(sk); } - } catch ( CancelledKeyException ckx ) { + } catch (CancelledKeyException ckx) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); @@ -934,14 +949,14 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } } - protected void unreg(SelectionKey sk, NioSocketWrapper attachment, int readyOps) { - //this is a must, so that we don't have multiple threads messing with the socket - reg(sk,attachment,sk.interestOps()& (~readyOps)); + protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) { + // This is a must, so that we don't have multiple threads messing with the socket + reg(sk, socketWrapper, sk.interestOps() & (~readyOps)); } - protected void reg(SelectionKey sk, NioSocketWrapper attachment, int intops) { + protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, int intops) { sk.interestOps(intops); - attachment.interestOps(intops); + socketWrapper.interestOps(intops); } protected void timeout(int keyCount, boolean hasEvents) { @@ -956,39 +971,49 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) { return; } - //timeout int keycount = 0; try { for (SelectionKey key : selector.keys()) { keycount++; try { - NioSocketWrapper ka = (NioSocketWrapper) key.attachment(); - if ( ka == null ) { - cancelledKey(key); //we don't support any keys without attachments + NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); + if ( socketWrapper == null ) { + // We don't support any keys without attachments + cancelledKey(key); } else if (close) { key.interestOps(0); - ka.interestOps(0); //avoid duplicate stop calls - processKey(key,ka); - } else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ || - (ka.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { + // Avoid duplicate stop calls + socketWrapper.interestOps(0); + processKey(key,socketWrapper); + } else if ((socketWrapper.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ || + (socketWrapper.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { boolean isTimedOut = false; + boolean readTimeout = false; + boolean writeTimeout = false; // Check for read timeout - if ((ka.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { - long delta = now - ka.getLastRead(); - long timeout = ka.getReadTimeout(); + if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { + long delta = now - socketWrapper.getLastRead(); + long timeout = socketWrapper.getReadTimeout(); isTimedOut = timeout > 0 && delta > timeout; + readTimeout = true; } // Check for write timeout - if (!isTimedOut && (ka.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { - long delta = now - ka.getLastWrite(); - long timeout = ka.getWriteTimeout(); + if (!isTimedOut && (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { + long delta = now - socketWrapper.getLastWrite(); + long timeout = socketWrapper.getWriteTimeout(); isTimedOut = timeout > 0 && delta > timeout; + writeTimeout = true; } if (isTimedOut) { key.interestOps(0); - ka.interestOps(0); //avoid duplicate timeout calls - ka.setError(new SocketTimeoutException()); - if (!processSocket(ka, SocketEvent.ERROR, true)) { + // Avoid duplicate timeout calls + socketWrapper.interestOps(0); + socketWrapper.setError(new SocketTimeoutException()); + if (readTimeout && socketWrapper.readOperation != null) { + getExecutor().execute(socketWrapper.readOperation); + } else if (writeTimeout && socketWrapper.writeOperation != null) { + getExecutor().execute(socketWrapper.writeOperation); + } else if (!processSocket(socketWrapper, SocketEvent.ERROR, true)) { cancelledKey(key); } } @@ -996,12 +1021,13 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> }catch ( CancelledKeyException ckx ) { cancelledKey(key); } - }//for + } } catch (ConcurrentModificationException cme) { // See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943 log.warn(sm.getString("endpoint.nio.timeoutCme"), cme); } - long prevExp = nextExpiration; //for logging purposes only + // For logging purposes only + long prevExp = nextExpiration; nextExpiration = System.currentTimeMillis() + socketProperties.getTimeoutInterval(); if (log.isTraceEnabled()) { @@ -1014,7 +1040,8 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } } - // ---------------------------------------------------- Key Attachment Class + // --------------------------------------------------- Socket Wrapper Class + public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> { private final NioSelectorPool pool; @@ -1023,6 +1050,10 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> private int interestOps = 0; private CountDownLatch readLatch = null; private CountDownLatch writeLatch = null; + private final Semaphore readPending; + private OperationState<?> readOperation = null; + private final Semaphore writePending; + private OperationState<?> writeOperation = null; private volatile SendfileData sendfileData = null; private volatile long lastRead = System.currentTimeMillis(); private volatile long lastWrite = lastRead; @@ -1030,41 +1061,54 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) { super(channel, endpoint); + if (endpoint.getUseAsyncIO()) { + readPending = new Semaphore(1); + writePending = new Semaphore(1); + } else { + readPending = null; + writePending = null; + } pool = endpoint.getSelectorPool(); socketBufferHandler = channel.getBufHandler(); } - public Poller getPoller() { return poller;} - public void setPoller(Poller poller){this.poller = poller;} - public int interestOps() { return interestOps;} + public Poller getPoller() { return poller; } + public void setPoller(Poller poller){this.poller = poller; } + public int interestOps() { return interestOps; } public int interestOps(int ops) { this.interestOps = ops; return ops; } public CountDownLatch getReadLatch() { return readLatch; } public CountDownLatch getWriteLatch() { return writeLatch; } protected CountDownLatch resetLatch(CountDownLatch latch) { - if ( latch==null || latch.getCount() == 0 ) return null; - else throw new IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero")); + if (latch==null || latch.getCount() == 0) { + return null; + } else { + throw new IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero")); + } } public void resetReadLatch() { readLatch = resetLatch(readLatch); } public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); } protected CountDownLatch startLatch(CountDownLatch latch, int cnt) { - if ( latch == null || latch.getCount() == 0 ) { + if (latch == null || latch.getCount() == 0) { return new CountDownLatch(cnt); + } else { + throw new IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero")); } - else throw new IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero")); } - public void startReadLatch(int cnt) { readLatch = startLatch(readLatch,cnt);} - public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch,cnt);} + public void startReadLatch(int cnt) { readLatch = startLatch(readLatch, cnt); } + public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch, cnt); } protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException { - if ( latch == null ) throw new IllegalStateException(sm.getString("endpoint.nio.nullLatch")); + if (latch == null) { + throw new IllegalStateException(sm.getString("endpoint.nio.nullLatch")); + } // Note: While the return value is ignored if the latch does time // out, logic further up the call stack will trigger a // SocketTimeoutException - latch.await(timeout,unit); + latch.await(timeout, unit); } - public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch,timeout,unit);} - public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch,timeout,unit);} + public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch, timeout, unit); } + public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch, timeout, unit); } public void setSendfileData(SendfileData sf) { this.sendfileData = sf;} public SendfileData getSendfileData() { return this.sendfileData;} @@ -1074,7 +1118,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> public void updateLastRead() { lastRead = System.currentTimeMillis(); } public long getLastRead() { return lastRead; } - @Override public boolean isReadyForRead() throws IOException { socketBufferHandler.configureReadBufferForRead(); @@ -1189,12 +1232,11 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> // Ignore } try { - NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper) channel - .getAttachment(); - if (att == null) { + NioSocketWrapper socketWrapper = (NioSocketWrapper) channel.getAttachment(); + if (socketWrapper == null) { throw new IOException(sm.getString("endpoint.nio.keyMustBeCancelled")); } - nRead = pool.read(to, channel, selector, att.getReadTimeout()); + nRead = pool.read(to, channel, selector, socketWrapper.getReadTimeout()); } finally { if (selector != null) { pool.put(selector); @@ -1354,6 +1396,296 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> public void setAppReadBufHandler(ApplicationBufferHandler handler) { getSocket().setAppReadBufHandler(handler); } + + @Override + public boolean hasAsyncIO() { + // The semaphores are only created if async IO is enabled + return (readPending != null); + } + + /** + * Internal state tracker for scatter/gather operations. + */ + private class OperationState<A> implements Runnable { + private final boolean read; + private final ByteBuffer[] buffers; + private final int offset; + private final int length; + private final A attachment; + private final BlockingMode block; + private final CompletionCheck check; + private final CompletionHandler<Long, ? super A> handler; + private final Semaphore semaphore; + private final VectoredIOCompletionHandler<A> completion; + private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler, + Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { + this.read = read; + this.buffers = buffers; + this.offset = offset; + this.length = length; + this.block = block; + this.attachment = attachment; + this.check = check; + this.handler = handler; + this.semaphore = semaphore; + this.completion = completion; + } + private volatile boolean inline = true; + private volatile long nBytes = 0; + private volatile CompletionState state = CompletionState.PENDING; + + @Override + public void run() { + // Perform the IO operation + // Called from the poller to continue the IO operation + long nBytes = 0; + if (getError() == null) { + try { + if (read) { + nBytes = getSocket().read(buffers, offset, length); + } else { + nBytes = getSocket().write(buffers, offset, length); + } + } catch (IOException e) { + setError(e); + } + } + if (nBytes > 0) { + // The bytes read are only updated in the completion handler + completion.completed(Long.valueOf(nBytes), this); + } else if (nBytes < 0 || getError() != null) { + IOException error = getError(); + if (error == null) { + error = new EOFException(); + } + completion.failed(error, this); + } else { + // As soon as the operation uses the poller, it is no longer inline + inline = false; + if (read) { + registerReadInterest(); + } else { + registerWriteInterest(); + } + } + } + + } + + @Override + public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + IOException ioe = getError(); + if (ioe != null) { + handler.failed(ioe, attachment); + return CompletionState.ERROR; + } + if (timeout == -1) { + timeout = toTimeout(getReadTimeout()); + } else if (unit.toMillis(timeout) != getReadTimeout()) { + setReadTimeout(unit.toMillis(timeout)); + } + if (block != BlockingMode.NON_BLOCK) { + try { + if (!readPending.tryAcquire(timeout, unit)) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(e, attachment); + return CompletionState.ERROR; + } + } else { + if (!readPending.tryAcquire()) { + return CompletionState.NOT_DONE; + } + } + VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); + OperationState<A> state = new OperationState<>(true, dsts, offset, length, block, + timeout, unit, attachment, check, handler, readPending, completion); + readOperation = state; + long nBytes = 0; + if (!socketBufferHandler.isReadBufferEmpty()) { + // There is still data inside the main read buffer, use it to fill out the destination buffers + // Note: It is not necessary to put this code in the completion handler + socketBufferHandler.configureReadBufferForRead(); + for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) { + nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]); + } + if (nBytes > 0) { + completion.completed(Long.valueOf(nBytes), state); + } + } + if (nBytes == 0) { + state.run(); + } + if (block == BlockingMode.BLOCK) { + synchronized (state) { + if (state.state == CompletionState.PENDING) { + try { + state.wait(unit.toMillis(timeout)); + if (state.state == CompletionState.PENDING) { + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } + } + } + return state.state; + } + + @Override + public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + IOException ioe = getError(); + if (ioe != null) { + handler.failed(ioe, attachment); + return CompletionState.ERROR; + } + if (timeout == -1) { + timeout = toTimeout(getWriteTimeout()); + } else if (unit.toMillis(timeout) != getWriteTimeout()) { + setWriteTimeout(unit.toMillis(timeout)); + } + if (block != BlockingMode.NON_BLOCK) { + try { + if (!writePending.tryAcquire(timeout, unit)) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(e, attachment); + return CompletionState.ERROR; + } + } else { + if (!writePending.tryAcquire()) { + return CompletionState.NOT_DONE; + } + } + if (!socketBufferHandler.isWriteBufferEmpty()) { + // First flush the main buffer as needed + try { + doWrite(true); + } catch (IOException e) { + handler.failed(e, attachment); + return CompletionState.ERROR; + } + } + VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); + OperationState<A> state = new OperationState<>(false, srcs, offset, length, block, + timeout, unit, attachment, check, handler, writePending, completion); + writeOperation = state; + // It should be less necessary to check the buffer state as it is easy to flush before + state.run(); + if (block == BlockingMode.BLOCK) { + synchronized (state) { + if (state.state == CompletionState.PENDING) { + try { + state.wait(unit.toMillis(timeout)); + if (state.state == CompletionState.PENDING) { + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } + } + } + return state.state; + } + + private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { + @Override + public void completed(Long nBytes, OperationState<A> state) { + if (nBytes.longValue() < 0) { + failed(new EOFException(), state); + } else { + state.nBytes += nBytes.longValue(); + CompletionState currentState = state.inline ? CompletionState.INLINE : CompletionState.DONE; + boolean complete = true; + boolean completion = true; + if (state.check != null) { + switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) { + case CONTINUE: + complete = false; + break; + case DONE: + break; + case NONE: + completion = false; + break; + } + } + if (complete) { + boolean notify = false; + state.semaphore.release(); + if (state.read) { + readOperation = null; + } else { + writeOperation = null; + } + if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { + notify = true; + } else { + state.state = currentState; + } + if (completion && state.handler != null) { + state.handler.completed(Long.valueOf(state.nBytes), state.attachment); + } + if (notify) { + synchronized (state) { + state.state = currentState; + state.notify(); + } + } + } else { + state.run(); + } + } + } + @Override + public void failed(Throwable exc, OperationState<A> state) { + IOException ioe; + if (exc instanceof InterruptedByTimeoutException) { + ioe = new SocketTimeoutException(); + } else if (exc instanceof IOException) { + ioe = (IOException) exc; + } else { + ioe = new IOException(exc); + } + setError(ioe); + boolean notify = false; + state.semaphore.release(); + if (state.read) { + readOperation = null; + } else { + writeOperation = null; + } + if (state.block == BlockingMode.BLOCK) { + notify = true; + } else { + state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE; + } + if (state.handler != null) { + state.handler.failed(ioe, state.attachment); + } + if (notify) { + synchronized (state) { + state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE; + state.notify(); + } + } + } + } + } @@ -1443,6 +1775,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } // ----------------------------------------------- SendfileData Inner Class + /** * SendfileData class. */ diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 7fef23d..fc07b86 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -50,6 +50,12 @@ <update> Add vectoring for NIO in the base and SSL channels. (remm) </update> + <add> + Add asynchronous IO from NIO2 to the NIO connector, with support for + the async IO implementations for HTTP/2 and Websockets. The + <code>useAsyncIO</code> boolean attribute on the Connector element + allows disabling usage of the asynchronous IO API. (remm) + </add> </changelog> </subsection> </section> diff --git a/webapps/docs/config/http.xml b/webapps/docs/config/http.xml index f3e3f7a..9e68e39 100644 --- a/webapps/docs/config/http.xml +++ b/webapps/docs/config/http.xml @@ -760,6 +760,11 @@ default value is <code>1000</code> milliseconds.</p> </attribute> + <attribute name="useAsyncIO" required="false"> + <p>(bool)Use this attribute to enable or disable usage of the + asynchronous IO API. The default value is <code>true</code>.</p> + </attribute> + <attribute name="useSendfile" required="false"> <p>(bool)Use this attribute to enable or disable sendfile capability. The default value is <code>true</code>. Note that the use of sendfile @@ -899,6 +904,11 @@ <attributes> + <attribute name="useAsyncIO" required="false"> + <p>(bool)Use this attribute to enable or disable usage of the + asynchronous IO API. The default value is <code>true</code>.</p> + </attribute> + <attribute name="useSendfile" required="false"> <p>(bool)Use this attribute to enable or disable sendfile capability. The default value is <code>true</code>. Note that the use of sendfile --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org