This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new bc714fd  Add asynchronous IO API for NIO
bc714fd is described below

commit bc714fd2e3fca74d8931533770b6ee064b67287f
Author: remm <r...@apache.org>
AuthorDate: Wed Apr 10 21:37:48 2019 +0200

    Add asynchronous IO API for NIO
    
    This uses the concepts from the NIO2 implementation. The HTTP/2 and
    Websockets async IO code will automatically use it, with the same
    behavior as when using NIO2.
    As it seems to work just fine so far and with the next build being far
    off, I am enabling it by default to get some testing.
---
 .../apache/tomcat/util/net/AbstractEndpoint.java   |   8 +
 java/org/apache/tomcat/util/net/AprEndpoint.java   |   4 +
 java/org/apache/tomcat/util/net/Nio2Endpoint.java  |   3 +-
 java/org/apache/tomcat/util/net/NioEndpoint.java   | 493 +++++++++++++++++----
 webapps/docs/changelog.xml                         |   6 +
 webapps/docs/config/http.xml                       |  10 +
 6 files changed, 443 insertions(+), 81 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java 
b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
index 0b8b683..682012c 100644
--- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
@@ -741,6 +741,14 @@ public abstract class AbstractEndpoint<S,U> {
     public boolean getDaemon() { return daemon; }
 
 
+    /**
+     * Expose async IO capability.
+     */
+    private boolean useAsyncIO = true;
+    public void setUseAsyncIO(boolean useAsyncIO) { this.useAsyncIO = 
useAsyncIO; }
+    public boolean getUseAsyncIO() { return useAsyncIO; }
+
+
     protected abstract boolean getDeferAccept();
 
 
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java 
b/java/org/apache/tomcat/util/net/AprEndpoint.java
index b9f15d0..bfb5c55 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -129,6 +129,10 @@ public class AprEndpoint extends 
AbstractEndpoint<Long,Long> implements SNICallB
     public boolean getDeferAccept() { return deferAccept; }
 
 
+    @Override
+    public boolean getUseAsyncIO() { return false; }
+
+
     private boolean ipv6v6only = false;
     public void setIpv6v6only(boolean ipv6v6only) { this.ipv6v6only = 
ipv6v6only; }
     public boolean getIpv6v6only() { return ipv6v6only; }
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java 
b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index fd31ac1..370934d 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -91,6 +91,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
     // ------------------------------------------------------------- Properties
 
+
     /**
      * Is deferAccept supported?
      */
@@ -941,7 +942,7 @@ public class Nio2Endpoint extends 
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
         @Override
         public boolean hasAsyncIO() {
-            return true;
+            return getEndpoint().getUseAsyncIO();
         }
 
         /**
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java 
b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 621dd63..74b0b88 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -27,7 +27,9 @@ import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.Channel;
+import java.nio.channels.CompletionHandler;
 import java.nio.channels.FileChannel;
+import java.nio.channels.InterruptedByTimeoutException;
 import java.nio.channels.NetworkChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -37,6 +39,7 @@ import java.nio.channels.WritableByteChannel;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -353,7 +356,6 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
         serverSock = null;
     }
 
-
     // ------------------------------------------------------ Protected Methods
 
     public NioSelectorPool getSelectorPool() {
@@ -382,7 +384,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
     protected boolean setSocketOptions(SocketChannel socket) {
         // Process the connection
         try {
-            //disable blocking, APR style, we are gonna be polling it
+            // Disable blocking, polling will be used
             socket.configureBlocking(false);
             Socket sock = socket.socket();
             socketProperties.setProperties(sock);
@@ -533,7 +535,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                     } else {
                         final NioSocketWrapper socketWrapper = 
(NioSocketWrapper) key.attachment();
                         if (socketWrapper != null) {
-                            //we are registering the key to start with, reset 
the fairness counter.
+                            // We are registering the key to start with, reset 
the fairness counter.
                             int ops = key.interestOps() | interestOps;
                             socketWrapper.interestOps(ops);
                             key.interestOps(ops);
@@ -566,7 +568,8 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                 new SynchronizedQueue<>();
 
         private volatile boolean close = false;
-        private long nextExpiration = 0;//optimize expiration handling
+        // Optimize expiration handling
+        private long nextExpiration = 0;
 
         private AtomicLong wakeupCounter = new AtomicLong(0);
 
@@ -578,7 +581,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         public int getKeyCount() { return keyCount; }
 
-        public Selector getSelector() { return selector;}
+        public Selector getSelector() { return selector; }
 
         /**
          * Destroy the poller.
@@ -593,7 +596,9 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         private void addEvent(PollerEvent event) {
             events.offer(event);
-            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
+            if (wakeupCounter.incrementAndGet() == 0) {
+                selector.wakeup();
+            }
         }
 
         /**
@@ -608,12 +613,15 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
          */
         public void add(final NioChannel socket, final int interestOps) {
             PollerEvent r = eventCache.pop();
-            if ( r==null) r = new PollerEvent(socket,null,interestOps);
-            else r.reset(socket,null,interestOps);
+            if (r == null) {
+                r = new PollerEvent(socket, null, interestOps);
+            } else {
+                r.reset(socket, null, interestOps);
+            }
             addEvent(r);
             if (close) {
-                NioEndpoint.NioSocketWrapper ka = 
(NioEndpoint.NioSocketWrapper)socket.getAttachment();
-                processSocket(ka, SocketEvent.STOP, false);
+                NioSocketWrapper socketWrapper = (NioSocketWrapper) 
socket.getAttachment();
+                processSocket(socketWrapper, SocketEvent.STOP, false);
             }
         }
 
@@ -650,17 +658,20 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
          */
         public void register(final NioChannel socket) {
             socket.setPoller(this);
-            NioSocketWrapper ka = new NioSocketWrapper(socket, 
NioEndpoint.this);
-            socket.setSocketWrapper(ka);
-            ka.setPoller(this);
-            ka.setReadTimeout(getConnectionTimeout());
-            ka.setWriteTimeout(getConnectionTimeout());
-            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
-            ka.setSecure(isSSLEnabled());
+            NioSocketWrapper socketWrapper = new NioSocketWrapper(socket, 
NioEndpoint.this);
+            socket.setSocketWrapper(socketWrapper);
+            socketWrapper.setPoller(this);
+            socketWrapper.setReadTimeout(getConnectionTimeout());
+            socketWrapper.setWriteTimeout(getConnectionTimeout());
+            
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
+            socketWrapper.setSecure(isSSLEnabled());
             PollerEvent r = eventCache.pop();
-            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER 
turns into.
-            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
-            else r.reset(socket,ka,OP_REGISTER);
+            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what 
OP_REGISTER turns into.
+            if (r == null) {
+                r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
+            } else {
+                r.reset(socket, socketWrapper, OP_REGISTER);
+            }
             addEvent(r);
         }
 
@@ -736,8 +747,8 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                     if (!close) {
                         hasEvents = events();
                         if (wakeupCounter.getAndSet(-1) > 0) {
-                            //if we are here, means we have other stuff to do
-                            //do a non blocking select
+                            // If we are here, means we have other stuff to do
+                            // Do a non blocking select
                             keyCount = selector.selectNow();
                         } else {
                             keyCount = selector.select(selectorTimeout);
@@ -759,7 +770,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                     log.error(sm.getString("endpoint.nio.selectorLoopError"), 
x);
                     continue;
                 }
-                //either we timed out or we woke up, process events first
+                // Either we timed out or we woke up, process events first
                 if ( keyCount == 0 ) hasEvents = (hasEvents | events());
 
                 Iterator<SelectionKey> iterator =
@@ -777,34 +788,38 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                         iterator.remove();
                         processKey(sk, attachment);
                     }
-                }//while
+                }
 
-                //process timeouts
+                // Process timeouts
                 timeout(keyCount,hasEvents);
-            }//while
+            }
 
             getStopLatch().countDown();
         }
 
-        protected void processKey(SelectionKey sk, NioSocketWrapper 
attachment) {
+        protected void processKey(SelectionKey sk, NioSocketWrapper 
socketWrapper) {
             try {
-                if ( close ) {
+                if (close) {
                     cancelledKey(sk);
-                } else if ( sk.isValid() && attachment != null ) {
-                    if (sk.isReadable() || sk.isWritable() ) {
-                        if ( attachment.getSendfileData() != null ) {
-                            processSendfile(sk,attachment, false);
+                } else if (sk.isValid() && socketWrapper != null) {
+                    if (sk.isReadable() || sk.isWritable()) {
+                        if ( socketWrapper.getSendfileData() != null ) {
+                            processSendfile(sk, socketWrapper, false);
                         } else {
-                            unreg(sk, attachment, sk.readyOps());
+                            unreg(sk, socketWrapper, sk.readyOps());
                             boolean closeSocket = false;
                             // Read goes before write
                             if (sk.isReadable()) {
-                                if (!processSocket(attachment, 
SocketEvent.OPEN_READ, true)) {
+                                if (socketWrapper.readOperation != null) {
+                                    
getExecutor().execute(socketWrapper.readOperation);
+                                } else if (!processSocket(socketWrapper, 
SocketEvent.OPEN_READ, true)) {
                                     closeSocket = true;
                                 }
                             }
                             if (!closeSocket && sk.isWritable()) {
-                                if (!processSocket(attachment, 
SocketEvent.OPEN_WRITE, true)) {
+                                if (socketWrapper.writeOperation != null) {
+                                    
getExecutor().execute(socketWrapper.writeOperation);
+                                } else if (!processSocket(socketWrapper, 
SocketEvent.OPEN_WRITE, true)) {
                                     closeSocket = true;
                                 }
                             }
@@ -814,10 +829,10 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                         }
                     }
                 } else {
-                    //invalid key
+                    // Invalid key
                     cancelledKey(sk);
                 }
-            } catch ( CancelledKeyException ckx ) {
+            } catch (CancelledKeyException ckx) {
                 cancelledKey(sk);
             } catch (Throwable t) {
                 ExceptionUtils.handleThrowable(t);
@@ -934,14 +949,14 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
             }
         }
 
-        protected void unreg(SelectionKey sk, NioSocketWrapper attachment, int 
readyOps) {
-            //this is a must, so that we don't have multiple threads messing 
with the socket
-            reg(sk,attachment,sk.interestOps()& (~readyOps));
+        protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, 
int readyOps) {
+            // This is a must, so that we don't have multiple threads messing 
with the socket
+            reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
         }
 
-        protected void reg(SelectionKey sk, NioSocketWrapper attachment, int 
intops) {
+        protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, 
int intops) {
             sk.interestOps(intops);
-            attachment.interestOps(intops);
+            socketWrapper.interestOps(intops);
         }
 
         protected void timeout(int keyCount, boolean hasEvents) {
@@ -956,39 +971,49 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
             if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < 
nextExpiration) && !close) {
                 return;
             }
-            //timeout
             int keycount = 0;
             try {
                 for (SelectionKey key : selector.keys()) {
                     keycount++;
                     try {
-                        NioSocketWrapper ka = (NioSocketWrapper) 
key.attachment();
-                        if ( ka == null ) {
-                            cancelledKey(key); //we don't support any keys 
without attachments
+                        NioSocketWrapper socketWrapper = (NioSocketWrapper) 
key.attachment();
+                        if ( socketWrapper == null ) {
+                            // We don't support any keys without attachments
+                            cancelledKey(key);
                         } else if (close) {
                             key.interestOps(0);
-                            ka.interestOps(0); //avoid duplicate stop calls
-                            processKey(key,ka);
-                        } else if ((ka.interestOps()&SelectionKey.OP_READ) == 
SelectionKey.OP_READ ||
-                                  (ka.interestOps()&SelectionKey.OP_WRITE) == 
SelectionKey.OP_WRITE) {
+                            // Avoid duplicate stop calls
+                            socketWrapper.interestOps(0);
+                            processKey(key,socketWrapper);
+                        } else if 
((socketWrapper.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ ||
+                                  
(socketWrapper.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                             boolean isTimedOut = false;
+                            boolean readTimeout = false;
+                            boolean writeTimeout = false;
                             // Check for read timeout
-                            if ((ka.interestOps() & SelectionKey.OP_READ) == 
SelectionKey.OP_READ) {
-                                long delta = now - ka.getLastRead();
-                                long timeout = ka.getReadTimeout();
+                            if ((socketWrapper.interestOps() & 
SelectionKey.OP_READ) == SelectionKey.OP_READ) {
+                                long delta = now - socketWrapper.getLastRead();
+                                long timeout = socketWrapper.getReadTimeout();
                                 isTimedOut = timeout > 0 && delta > timeout;
+                                readTimeout = true;
                             }
                             // Check for write timeout
-                            if (!isTimedOut && (ka.interestOps() & 
SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
-                                long delta = now - ka.getLastWrite();
-                                long timeout = ka.getWriteTimeout();
+                            if (!isTimedOut && (socketWrapper.interestOps() & 
SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
+                                long delta = now - 
socketWrapper.getLastWrite();
+                                long timeout = socketWrapper.getWriteTimeout();
                                 isTimedOut = timeout > 0 && delta > timeout;
+                                writeTimeout = true;
                             }
                             if (isTimedOut) {
                                 key.interestOps(0);
-                                ka.interestOps(0); //avoid duplicate timeout 
calls
-                                ka.setError(new SocketTimeoutException());
-                                if (!processSocket(ka, SocketEvent.ERROR, 
true)) {
+                                // Avoid duplicate timeout calls
+                                socketWrapper.interestOps(0);
+                                socketWrapper.setError(new 
SocketTimeoutException());
+                                if (readTimeout && socketWrapper.readOperation 
!= null) {
+                                    
getExecutor().execute(socketWrapper.readOperation);
+                                } else if (writeTimeout && 
socketWrapper.writeOperation != null) {
+                                    
getExecutor().execute(socketWrapper.writeOperation);
+                                } else if (!processSocket(socketWrapper, 
SocketEvent.ERROR, true)) {
                                     cancelledKey(key);
                                 }
                             }
@@ -996,12 +1021,13 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                     }catch ( CancelledKeyException ckx ) {
                         cancelledKey(key);
                     }
-                }//for
+                }
             } catch (ConcurrentModificationException cme) {
                 // See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943
                 log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
             }
-            long prevExp = nextExpiration; //for logging purposes only
+            // For logging purposes only
+            long prevExp = nextExpiration;
             nextExpiration = System.currentTimeMillis() +
                     socketProperties.getTimeoutInterval();
             if (log.isTraceEnabled()) {
@@ -1014,7 +1040,8 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
         }
     }
 
-    // ---------------------------------------------------- Key Attachment 
Class
+    // --------------------------------------------------- Socket Wrapper Class
+
     public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> 
{
 
         private final NioSelectorPool pool;
@@ -1023,6 +1050,10 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
         private int interestOps = 0;
         private CountDownLatch readLatch = null;
         private CountDownLatch writeLatch = null;
+        private final Semaphore readPending;
+        private OperationState<?> readOperation = null;
+        private final Semaphore writePending;
+        private OperationState<?> writeOperation = null;
         private volatile SendfileData sendfileData = null;
         private volatile long lastRead = System.currentTimeMillis();
         private volatile long lastWrite = lastRead;
@@ -1030,41 +1061,54 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
             super(channel, endpoint);
+            if (endpoint.getUseAsyncIO()) {
+                readPending = new Semaphore(1);
+                writePending = new Semaphore(1);
+            } else {
+                readPending = null;
+                writePending = null;
+            }
             pool = endpoint.getSelectorPool();
             socketBufferHandler = channel.getBufHandler();
         }
 
-        public Poller getPoller() { return poller;}
-        public void setPoller(Poller poller){this.poller = poller;}
-        public int interestOps() { return interestOps;}
+        public Poller getPoller() { return poller; }
+        public void setPoller(Poller poller){this.poller = poller; }
+        public int interestOps() { return interestOps; }
         public int interestOps(int ops) { this.interestOps  = ops; return ops; 
}
         public CountDownLatch getReadLatch() { return readLatch; }
         public CountDownLatch getWriteLatch() { return writeLatch; }
         protected CountDownLatch resetLatch(CountDownLatch latch) {
-            if ( latch==null || latch.getCount() == 0 ) return null;
-            else throw new 
IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
+            if (latch==null || latch.getCount() == 0) {
+                return null;
+            } else {
+                throw new 
IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
+            }
         }
         public void resetReadLatch() { readLatch = resetLatch(readLatch); }
         public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
 
         protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
-            if ( latch == null || latch.getCount() == 0 ) {
+            if (latch == null || latch.getCount() == 0) {
                 return new CountDownLatch(cnt);
+            } else {
+                throw new 
IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
             }
-            else throw new 
IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
         }
-        public void startReadLatch(int cnt) { readLatch = 
startLatch(readLatch,cnt);}
-        public void startWriteLatch(int cnt) { writeLatch = 
startLatch(writeLatch,cnt);}
+        public void startReadLatch(int cnt) { readLatch = 
startLatch(readLatch, cnt); }
+        public void startWriteLatch(int cnt) { writeLatch = 
startLatch(writeLatch, cnt); }
 
         protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit 
unit) throws InterruptedException {
-            if ( latch == null ) throw new 
IllegalStateException(sm.getString("endpoint.nio.nullLatch"));
+            if (latch == null) {
+                throw new 
IllegalStateException(sm.getString("endpoint.nio.nullLatch"));
+            }
             // Note: While the return value is ignored if the latch does time
             //       out, logic further up the call stack will trigger a
             //       SocketTimeoutException
-            latch.await(timeout,unit);
+            latch.await(timeout, unit);
         }
-        public void awaitReadLatch(long timeout, TimeUnit unit) throws 
InterruptedException { awaitLatch(readLatch,timeout,unit);}
-        public void awaitWriteLatch(long timeout, TimeUnit unit) throws 
InterruptedException { awaitLatch(writeLatch,timeout,unit);}
+        public void awaitReadLatch(long timeout, TimeUnit unit) throws 
InterruptedException { awaitLatch(readLatch, timeout, unit); }
+        public void awaitWriteLatch(long timeout, TimeUnit unit) throws 
InterruptedException { awaitLatch(writeLatch, timeout, unit); }
 
         public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
         public SendfileData getSendfileData() { return this.sendfileData;}
@@ -1074,7 +1118,6 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
         public void updateLastRead() { lastRead = System.currentTimeMillis(); }
         public long getLastRead() { return lastRead; }
 
-
         @Override
         public boolean isReadyForRead() throws IOException {
             socketBufferHandler.configureReadBufferForRead();
@@ -1189,12 +1232,11 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                     // Ignore
                 }
                 try {
-                    NioEndpoint.NioSocketWrapper att = 
(NioEndpoint.NioSocketWrapper) channel
-                            .getAttachment();
-                    if (att == null) {
+                    NioSocketWrapper socketWrapper = (NioSocketWrapper) 
channel.getAttachment();
+                    if (socketWrapper == null) {
                         throw new 
IOException(sm.getString("endpoint.nio.keyMustBeCancelled"));
                     }
-                    nRead = pool.read(to, channel, selector, 
att.getReadTimeout());
+                    nRead = pool.read(to, channel, selector, 
socketWrapper.getReadTimeout());
                 } finally {
                     if (selector != null) {
                         pool.put(selector);
@@ -1354,6 +1396,296 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
         public void setAppReadBufHandler(ApplicationBufferHandler handler) {
             getSocket().setAppReadBufHandler(handler);
         }
+
+        @Override
+        public boolean hasAsyncIO() {
+            // The semaphores are only created if async IO is enabled
+            return (readPending != null);
+        }
+
+        /**
+         * Internal state tracker for scatter/gather operations.
+         */
+        private class OperationState<A> implements Runnable {
+            private final boolean read;
+            private final ByteBuffer[] buffers;
+            private final int offset;
+            private final int length;
+            private final A attachment;
+            private final BlockingMode block;
+            private final CompletionCheck check;
+            private final CompletionHandler<Long, ? super A> handler;
+            private final Semaphore semaphore;
+            private final VectoredIOCompletionHandler<A> completion;
+            private OperationState(boolean read, ByteBuffer[] buffers, int 
offset, int length,
+                    BlockingMode block, long timeout, TimeUnit unit, A 
attachment,
+                    CompletionCheck check, CompletionHandler<Long, ? super A> 
handler,
+                    Semaphore semaphore, VectoredIOCompletionHandler<A> 
completion) {
+                this.read = read;
+                this.buffers = buffers;
+                this.offset = offset;
+                this.length = length;
+                this.block = block;
+                this.attachment = attachment;
+                this.check = check;
+                this.handler = handler;
+                this.semaphore = semaphore;
+                this.completion = completion;
+            }
+            private volatile boolean inline = true;
+            private volatile long nBytes = 0;
+            private volatile CompletionState state = CompletionState.PENDING;
+
+            @Override
+            public void run() {
+                // Perform the IO operation
+                // Called from the poller to continue the IO operation
+                long nBytes = 0;
+                if (getError() == null) {
+                    try {
+                        if (read) {
+                            nBytes = getSocket().read(buffers, offset, length);
+                        } else {
+                            nBytes = getSocket().write(buffers, offset, 
length);
+                        }
+                    } catch (IOException e) {
+                        setError(e);
+                    }
+                }
+                if (nBytes > 0) {
+                    // The bytes read are only updated in the completion 
handler
+                    completion.completed(Long.valueOf(nBytes), this);
+                } else if (nBytes < 0 || getError() != null) {
+                    IOException error = getError();
+                    if (error == null) {
+                        error = new EOFException();
+                    }
+                    completion.failed(error, this);
+                } else {
+                    // As soon as the operation uses the poller, it is no 
longer inline
+                    inline = false;
+                    if (read) {
+                        registerReadInterest();
+                    } else {
+                        registerWriteInterest();
+                    }
+                }
+            }
+
+        }
+
+        @Override
+        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int 
length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> 
handler) {
+            IOException ioe = getError();
+            if (ioe != null) {
+                handler.failed(ioe, attachment);
+                return CompletionState.ERROR;
+            }
+            if (timeout == -1) {
+                timeout = toTimeout(getReadTimeout());
+            } else if (unit.toMillis(timeout) != getReadTimeout()) {
+                setReadTimeout(unit.toMillis(timeout));
+            }
+            if (block != BlockingMode.NON_BLOCK) {
+                try {
+                    if (!readPending.tryAcquire(timeout, unit)) {
+                        handler.failed(new SocketTimeoutException(), 
attachment);
+                        return CompletionState.ERROR;
+                    }
+                } catch (InterruptedException e) {
+                    handler.failed(e, attachment);
+                    return CompletionState.ERROR;
+                }
+            } else {
+                if (!readPending.tryAcquire()) {
+                    return CompletionState.NOT_DONE;
+                }
+            }
+            VectoredIOCompletionHandler<A> completion = new 
VectoredIOCompletionHandler<>();
+            OperationState<A> state = new OperationState<>(true, dsts, offset, 
length, block,
+                    timeout, unit, attachment, check, handler, readPending, 
completion);
+            readOperation = state;
+            long nBytes = 0;
+            if (!socketBufferHandler.isReadBufferEmpty()) {
+                // There is still data inside the main read buffer, use it to 
fill out the destination buffers
+                // Note: It is not necessary to put this code in the 
completion handler
+                socketBufferHandler.configureReadBufferForRead();
+                for (int i = 0; i < length && 
!socketBufferHandler.isReadBufferEmpty(); i++) {
+                    nBytes += transfer(socketBufferHandler.getReadBuffer(), 
dsts[offset + i]);
+                }
+                if (nBytes > 0) {
+                    completion.completed(Long.valueOf(nBytes), state);
+                }
+            }
+            if (nBytes == 0) {
+                state.run();
+            }
+            if (block == BlockingMode.BLOCK) {
+                synchronized (state) {
+                    if (state.state == CompletionState.PENDING) {
+                        try {
+                            state.wait(unit.toMillis(timeout));
+                            if (state.state == CompletionState.PENDING) {
+                                return CompletionState.ERROR;
+                            }
+                        } catch (InterruptedException e) {
+                            handler.failed(new SocketTimeoutException(), 
attachment);
+                            return CompletionState.ERROR;
+                        }
+                    }
+                }
+            }
+            return state.state;
+        }
+
+        @Override
+        public <A> CompletionState write(ByteBuffer[] srcs, int offset, int 
length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> 
handler) {
+            IOException ioe = getError();
+            if (ioe != null) {
+                handler.failed(ioe, attachment);
+                return CompletionState.ERROR;
+            }
+            if (timeout == -1) {
+                timeout = toTimeout(getWriteTimeout());
+            } else if (unit.toMillis(timeout) != getWriteTimeout()) {
+                setWriteTimeout(unit.toMillis(timeout));
+            }
+            if (block != BlockingMode.NON_BLOCK) {
+                try {
+                    if (!writePending.tryAcquire(timeout, unit)) {
+                        handler.failed(new SocketTimeoutException(), 
attachment);
+                        return CompletionState.ERROR;
+                    }
+                } catch (InterruptedException e) {
+                    handler.failed(e, attachment);
+                    return CompletionState.ERROR;
+                }
+            } else {
+                if (!writePending.tryAcquire()) {
+                    return CompletionState.NOT_DONE;
+                }
+            }
+            if (!socketBufferHandler.isWriteBufferEmpty()) {
+                // First flush the main buffer as needed
+                try {
+                    doWrite(true);
+                } catch (IOException e) {
+                    handler.failed(e, attachment);
+                    return CompletionState.ERROR;
+                }
+            }
+            VectoredIOCompletionHandler<A> completion = new 
VectoredIOCompletionHandler<>();
+            OperationState<A> state = new OperationState<>(false, srcs, 
offset, length, block,
+                    timeout, unit, attachment, check, handler, writePending, 
completion);
+            writeOperation = state;
+            // It should be less necessary to check the buffer state as it is 
easy to flush before
+            state.run();
+            if (block == BlockingMode.BLOCK) {
+                synchronized (state) {
+                    if (state.state == CompletionState.PENDING) {
+                        try {
+                            state.wait(unit.toMillis(timeout));
+                            if (state.state == CompletionState.PENDING) {
+                                return CompletionState.ERROR;
+                            }
+                        } catch (InterruptedException e) {
+                            handler.failed(new SocketTimeoutException(), 
attachment);
+                            return CompletionState.ERROR;
+                        }
+                    }
+                }
+            }
+            return state.state;
+        }
+
+        private class VectoredIOCompletionHandler<A> implements 
CompletionHandler<Long, OperationState<A>> {
+            @Override
+            public void completed(Long nBytes, OperationState<A> state) {
+                if (nBytes.longValue() < 0) {
+                    failed(new EOFException(), state);
+                } else {
+                    state.nBytes += nBytes.longValue();
+                    CompletionState currentState = state.inline ? 
CompletionState.INLINE : CompletionState.DONE;
+                    boolean complete = true;
+                    boolean completion = true;
+                    if (state.check != null) {
+                        switch (state.check.callHandler(currentState, 
state.buffers, state.offset, state.length)) {
+                        case CONTINUE:
+                            complete = false;
+                            break;
+                        case DONE:
+                            break;
+                        case NONE:
+                            completion = false;
+                            break;
+                        }
+                    }
+                    if (complete) {
+                        boolean notify = false;
+                        state.semaphore.release();
+                        if (state.read) {
+                            readOperation = null;
+                        } else {
+                            writeOperation = null;
+                        }
+                        if (state.block == BlockingMode.BLOCK && currentState 
!= CompletionState.INLINE) {
+                            notify = true;
+                        } else {
+                            state.state = currentState;
+                        }
+                        if (completion && state.handler != null) {
+                            
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+                        }
+                        if (notify) {
+                            synchronized (state) {
+                                state.state = currentState;
+                                state.notify();
+                            }
+                        }
+                    } else {
+                        state.run();
+                    }
+                }
+            }
+            @Override
+            public void failed(Throwable exc, OperationState<A> state) {
+                IOException ioe;
+                if (exc instanceof InterruptedByTimeoutException) {
+                    ioe = new SocketTimeoutException();
+                } else if (exc instanceof IOException) {
+                    ioe = (IOException) exc;
+                } else {
+                    ioe = new IOException(exc);
+                }
+                setError(ioe);
+                boolean notify = false;
+                state.semaphore.release();
+                if (state.read) {
+                    readOperation = null;
+                } else {
+                    writeOperation = null;
+                }
+                if (state.block == BlockingMode.BLOCK) {
+                    notify = true;
+                } else {
+                    state.state = state.inline ? CompletionState.ERROR : 
CompletionState.DONE;
+                }
+                if (state.handler != null) {
+                    state.handler.failed(ioe, state.attachment);
+                }
+                if (notify) {
+                    synchronized (state) {
+                        state.state = state.inline ? CompletionState.ERROR : 
CompletionState.DONE;
+                        state.notify();
+                    }
+                }
+            }
+        }
+
     }
 
 
@@ -1443,6 +1775,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
     }
 
     // ----------------------------------------------- SendfileData Inner Class
+
     /**
      * SendfileData class.
      */
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 7fef23d..fc07b86 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -50,6 +50,12 @@
       <update>
         Add vectoring for NIO in the base and SSL channels. (remm)
       </update>
+      <add>
+        Add asynchronous IO from NIO2 to the NIO connector, with support for
+        the async IO implementations for HTTP/2 and Websockets. The
+        <code>useAsyncIO</code> boolean attribute on the Connector element
+        allows disabling usage of the asynchronous IO API. (remm)
+      </add>
     </changelog>
   </subsection>
 </section>
diff --git a/webapps/docs/config/http.xml b/webapps/docs/config/http.xml
index f3e3f7a..9e68e39 100644
--- a/webapps/docs/config/http.xml
+++ b/webapps/docs/config/http.xml
@@ -760,6 +760,11 @@
         default value is <code>1000</code> milliseconds.</p>
       </attribute>
 
+      <attribute name="useAsyncIO" required="false">
+        <p>(bool)Use this attribute to enable or disable usage of the
+        asynchronous IO API. The default value is <code>true</code>.</p>
+      </attribute>
+
       <attribute name="useSendfile" required="false">
         <p>(bool)Use this attribute to enable or disable sendfile capability.
         The default value is <code>true</code>. Note that the use of sendfile
@@ -899,6 +904,11 @@
 
     <attributes>
 
+      <attribute name="useAsyncIO" required="false">
+        <p>(bool)Use this attribute to enable or disable usage of the
+        asynchronous IO API. The default value is <code>true</code>.</p>
+      </attribute>
+
       <attribute name="useSendfile" required="false">
         <p>(bool)Use this attribute to enable or disable sendfile capability.
         The default value is <code>true</code>. Note that the use of sendfile


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to