This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new e813ae0  Remove poller thread count from NIO connector
e813ae0 is described below

commit e813ae0d9329ebf4b95c02043c39c676edb47d3c
Author: remm <r...@apache.org>
AuthorDate: Mon May 13 14:40:36 2019 +0200

    Remove poller thread count from NIO connector
    
    Simplify code when possible. As the poller is set for the connector,
    onlythe NioChannel and NioSocketWrapper have a dynamic association. I
    will close PR163.
---
 .../apache/coyote/http11/Http11NioProtocol.java    |   3 +-
 .../tomcat/util/net/NioBlockingSelector.java       |   4 +-
 java/org/apache/tomcat/util/net/NioChannel.java    |  32 ++---
 java/org/apache/tomcat/util/net/NioEndpoint.java   | 145 ++++++++-------------
 webapps/docs/changelog.xml                         |   6 +-
 webapps/docs/config/http.xml                       |  12 --
 6 files changed, 75 insertions(+), 127 deletions(-)

diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java 
b/java/org/apache/coyote/http11/Http11NioProtocol.java
index e79390b..e27bc89 100644
--- a/java/org/apache/coyote/http11/Http11NioProtocol.java
+++ b/java/org/apache/coyote/http11/Http11NioProtocol.java
@@ -47,11 +47,10 @@ public class Http11NioProtocol extends 
AbstractHttp11JsseProtocol<NioChannel> {
     // -------------------- Pool setup --------------------
 
     public void setPollerThreadCount(int count) {
-        ((NioEndpoint)getEndpoint()).setPollerThreadCount(count);
     }
 
     public int getPollerThreadCount() {
-        return ((NioEndpoint)getEndpoint()).getPollerThreadCount();
+        return 1;
     }
 
     public void setSelectorTimeout(long timeout) {
diff --git a/java/org/apache/tomcat/util/net/NioBlockingSelector.java 
b/java/org/apache/tomcat/util/net/NioBlockingSelector.java
index d723c7a..eb8d511 100644
--- a/java/org/apache/tomcat/util/net/NioBlockingSelector.java
+++ b/java/org/apache/tomcat/util/net/NioBlockingSelector.java
@@ -82,7 +82,7 @@ public class NioBlockingSelector {
      */
     public int write(ByteBuffer buf, NioChannel socket, long writeTimeout)
             throws IOException {
-        SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+        SelectionKey key = 
socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
         if (key == null) {
             throw new 
IOException(sm.getString("nioBlockingSelector.keyNotRegistered"));
         }
@@ -158,7 +158,7 @@ public class NioBlockingSelector {
      * @throws IOException if an IO Exception occurs in the underlying socket 
logic
      */
     public int read(ByteBuffer buf, NioChannel socket, long readTimeout) 
throws IOException {
-        SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+        SelectionKey key = 
socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
         if (key == null) {
             throw new 
IOException(sm.getString("nioBlockingSelector.keyNotRegistered"));
         }
diff --git a/java/org/apache/tomcat/util/net/NioChannel.java 
b/java/org/apache/tomcat/util/net/NioChannel.java
index 4bc865c..01222e6 100644
--- a/java/org/apache/tomcat/util/net/NioChannel.java
+++ b/java/org/apache/tomcat/util/net/NioChannel.java
@@ -21,11 +21,10 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
-import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 
-import org.apache.tomcat.util.net.NioEndpoint.Poller;
+import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
 /**
@@ -42,12 +41,10 @@ public class NioChannel implements ByteChannel, 
ScatteringByteChannel, Gathering
     protected static final ByteBuffer emptyBuf = ByteBuffer.allocate(0);
 
     protected SocketChannel sc = null;
-    protected SocketWrapperBase<NioChannel> socketWrapper = null;
+    protected NioSocketWrapper socketWrapper = null;
 
     protected final SocketBufferHandler bufHandler;
 
-    protected Poller poller;
-
     public NioChannel(SocketChannel channel, SocketBufferHandler bufHandler) {
         this.sc = channel;
         this.bufHandler = bufHandler;
@@ -63,11 +60,18 @@ public class NioChannel implements ByteChannel, 
ScatteringByteChannel, Gathering
     }
 
 
-    void setSocketWrapper(SocketWrapperBase<NioChannel> socketWrapper) {
+    void setSocketWrapper(NioSocketWrapper socketWrapper) {
         this.socketWrapper = socketWrapper;
     }
 
     /**
+     * @return the socketWrapper
+     */
+    NioSocketWrapper getSocketWrapper() {
+        return socketWrapper;
+    }
+
+    /**
      * Free the channel memory
      */
     public void free() {
@@ -172,22 +176,10 @@ public class NioChannel implements ByteChannel, 
ScatteringByteChannel, Gathering
         return sc.read(dsts, offset, length);
     }
 
-    public Object getAttachment() {
-        Poller pol = getPoller();
-        Selector sel = pol!=null?pol.getSelector():null;
-        SelectionKey key = sel!=null?getIOChannel().keyFor(sel):null;
-        Object att = key!=null?key.attachment():null;
-        return att;
-    }
-
     public SocketBufferHandler getBufHandler() {
         return bufHandler;
     }
 
-    public Poller getPoller() {
-        return poller;
-    }
-
     public SocketChannel getIOChannel() {
         return sc;
     }
@@ -213,10 +205,6 @@ public class NioChannel implements ByteChannel, 
ScatteringByteChannel, Gathering
         return 0;
     }
 
-    public void setPoller(Poller poller) {
-        this.poller = poller;
-    }
-
     public void setIOChannel(SocketChannel IOChannel) {
         this.sc = IOChannel;
     }
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java 
b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 1dac62f..0831273 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -44,7 +44,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ssl.SSLEngine;
@@ -147,34 +146,32 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
 
     /**
-     * Poller thread count.
+     * NO-OP.
+     *
+     * @param pollerThreadCount Unused
+     *
+     * @deprecated Will be removed in Tomcat 10.
+     */
+    @Deprecated
+    public void setPollerThreadCount(int pollerThreadCount) { }
+    /**
+     * Always returns 1.
+     *
+     * @return Always 1.
+     *
+     * @deprecated Will be removed in Tomcat 10.
      */
-    private int pollerThreadCount = 1;
-    public void setPollerThreadCount(int pollerThreadCount) { 
this.pollerThreadCount = pollerThreadCount; }
-    public int getPollerThreadCount() { return pollerThreadCount; }
+    @Deprecated
+    public int getPollerThreadCount() { return 1; }
 
     private long selectorTimeout = 1000;
     public void setSelectorTimeout(long timeout) { this.selectorTimeout = 
timeout;}
     public long getSelectorTimeout() { return this.selectorTimeout; }
 
     /**
-     * The socket pollers.
+     * The socket poller.
      */
-    private Poller[] pollers = null;
-    private AtomicInteger pollerRotater = new AtomicInteger(0);
-    /**
-     * Return an available poller in true round robin fashion.
-     *
-     * @return The next poller in sequence
-     */
-    public Poller getPoller0() {
-        if (pollerThreadCount == 1) {
-            return pollers[0];
-        } else {
-            int idx = Math.abs(pollerRotater.incrementAndGet()) % 
pollers.length;
-            return pollers[idx];
-        }
-    }
+    private Poller poller = null;
 
 
     public void setSelectorPool(NioSelectorPool selectorPool) {
@@ -200,14 +197,10 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
      *         for the next request to be received on the socket
      */
     public int getKeepAliveCount() {
-        if (pollers == null) {
+        if (poller == null) {
             return 0;
         } else {
-            int sum = 0;
-            for (int i = 0; i < pollers.length; i++) {
-                sum += pollers[i].getKeyCount();
-            }
-            return sum;
+            return poller.getKeyCount();
         }
     }
 
@@ -221,16 +214,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
     public void bind() throws Exception {
         initServerSocket();
 
-        // Initialize thread count defaults for acceptor, poller
-        if (acceptorThreadCount == 0) {
-            // FIXME: Doesn't seem to work that well with multiple accept 
threads
-            acceptorThreadCount = 1;
-        }
-        if (pollerThreadCount <= 0) {
-            //minimum one poller thread
-            pollerThreadCount = 1;
-        }
-        setStopLatch(new CountDownLatch(pollerThreadCount));
+        setStopLatch(new CountDownLatch(1));
 
         // Initialize SSL if needed
         initialiseSsl();
@@ -290,15 +274,12 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
             initializeConnectionLatch();
 
-            // Start poller threads
-            pollers = new Poller[getPollerThreadCount()];
-            for (int i = 0; i < pollers.length; i++) {
-                pollers[i] = new Poller();
-                Thread pollerThread = new Thread(pollers[i], getName() + 
"-ClientPoller-" + i);
-                pollerThread.setPriority(threadPriority);
-                pollerThread.setDaemon(true);
-                pollerThread.start();
-            }
+            // Start poller thread
+            poller = new Poller();
+            Thread pollerThread = new Thread(poller, getName() + 
"-ClientPoller");
+            pollerThread.setPriority(threadPriority);
+            pollerThread.setDaemon(true);
+            pollerThread.start();
 
             startAcceptorThread();
         }
@@ -315,12 +296,9 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
         }
         if (running) {
             running = false;
-            for (int i = 0; pollers != null && i < pollers.length; i++) {
-                if (pollers[i] == null) {
-                    continue;
-                }
-                pollers[i].destroy();
-                pollers[i] = null;
+            if (poller != null) {
+                poller.destroy();
+                poller = null;
             }
             try {
                 if (!getStopLatch().await(selectorTimeout + 100, 
TimeUnit.MILLISECONDS)) {
@@ -433,7 +411,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                 channel.setIOChannel(socket);
                 channel.reset();
             }
-            getPoller0().register(channel);
+            poller.register(channel);
         } catch (Throwable t) {
             ExceptionUtils.handleThrowable(t);
             try {
@@ -495,7 +473,8 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
     private void close(NioChannel socket, SelectionKey key) {
         try {
-            if (socket.getPoller().cancelledKey(key) != null) {
+            Poller poller = this.poller;
+            if (poller != null && poller.cancelledKey(key) != null) {
                 // SocketWrapper (attachment) was removed from the
                 // key - recycle the key. This can only happen once
                 // per attempted closure so it is used to determine
@@ -525,33 +504,30 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         private NioChannel socket;
         private int interestOps;
-        private NioSocketWrapper socketWrapper;
 
-        public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
-            reset(ch, w, intOps);
+        public PollerEvent(NioChannel ch, int intOps) {
+            reset(ch, intOps);
         }
 
-        public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
+        public void reset(NioChannel ch, int intOps) {
             socket = ch;
             interestOps = intOps;
-            socketWrapper = w;
         }
 
         public void reset() {
-            reset(null, null, 0);
+            reset(null, 0);
         }
 
         @Override
         public void run() {
             if (interestOps == OP_REGISTER) {
                 try {
-                    socket.getIOChannel().register(
-                            socket.getPoller().getSelector(), 
SelectionKey.OP_READ, socketWrapper);
+                    
socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(),
 SelectionKey.OP_READ, socket.getSocketWrapper());
                 } catch (Exception x) {
                     log.error(sm.getString("endpoint.nio.registerFail"), x);
                 }
             } else {
-                final SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+                final SelectionKey key = 
socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
                 try {
                     if (key == null) {
                         // The key was cancelled (e.g. due to socket closure)
@@ -571,12 +547,12 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                             socketWrapper.interestOps(ops);
                             key.interestOps(ops);
                         } else {
-                            socket.getPoller().cancelledKey(key);
+                            
socket.getSocketWrapper().getPoller().cancelledKey(key);
                         }
                     }
                 } catch (CancelledKeyException ckx) {
                     try {
-                        socket.getPoller().cancelledKey(key);
+                        
socket.getSocketWrapper().getPoller().cancelledKey(key);
                     } catch (Exception ignore) {}
                 }
             }
@@ -584,7 +560,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         @Override
         public String toString() {
-            return "Poller event: socket [" + socket + "], socketWrapper [" + 
socketWrapper +
+            return "Poller event: socket [" + socket + "], socketWrapper [" + 
socket.getSocketWrapper() +
                     "], interestOps [" + interestOps + "]";
         }
     }
@@ -638,23 +614,22 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
          * of time equal to pollTime (in most cases, latency will be much 
lower,
          * however).
          *
-         * @param socket to add to the poller
+         * @param socketWrapper to add to the poller
          * @param interestOps Operations for which to register this socket with
          *                    the Poller
          */
-        public void add(final NioChannel socket, final int interestOps) {
+        public void add(NioSocketWrapper socketWrapper, int interestOps) {
             PollerEvent r = null;
             if (eventCache != null) {
                 r = eventCache.pop();
             }
             if (r == null) {
-                r = new PollerEvent(socket, null, interestOps);
+                r = new PollerEvent(socketWrapper.getSocket(), interestOps);
             } else {
-                r.reset(socket, null, interestOps);
+                r.reset(socketWrapper.getSocket(), interestOps);
             }
             addEvent(r);
             if (close) {
-                NioSocketWrapper socketWrapper = (NioSocketWrapper) 
socket.getAttachment();
                 processSocket(socketWrapper, SocketEvent.STOP, false);
             }
         }
@@ -691,7 +666,6 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
          * @param socket    The newly created socket
          */
         public void register(final NioChannel socket) {
-            socket.setPoller(this);
             NioSocketWrapper socketWrapper = new NioSocketWrapper(socket, 
NioEndpoint.this);
             socket.setSocketWrapper(socketWrapper);
             socketWrapper.setPoller(this);
@@ -705,9 +679,9 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                 r = eventCache.pop();
             }
             if (r == null) {
-                r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
+                r = new PollerEvent(socket, OP_REGISTER);
             } else {
-                r.reset(socket, socketWrapper, OP_REGISTER);
+                r.reset(socket, OP_REGISTER);
             }
             addEvent(r);
         }
@@ -943,7 +917,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                         log.debug("OP_WRITE for sendfile: " + sd.fileName);
                     }
                     if (calledByProcessor) {
-                        add(socketWrapper.getSocket(), SelectionKey.OP_WRITE);
+                        add(socketWrapper, SelectionKey.OP_WRITE);
                     } else {
                         reg(sk, socketWrapper, SelectionKey.OP_WRITE);
                     }
@@ -1290,11 +1264,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                     // Ignore
                 }
                 try {
-                    NioSocketWrapper socketWrapper = (NioSocketWrapper) 
channel.getAttachment();
-                    if (socketWrapper == null) {
-                        throw new 
IOException(sm.getString("endpoint.nio.keyMustBeCancelled"));
-                    }
-                    nRead = pool.read(to, channel, selector, 
socketWrapper.getReadTimeout());
+                    nRead = pool.read(to, channel, selector, getReadTimeout());
                 } finally {
                     if (selector != null) {
                         pool.put(selector);
@@ -1350,13 +1320,13 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         @Override
         public void registerReadInterest() {
-            getPoller().add(getSocket(), SelectionKey.OP_READ);
+            getPoller().add(this, SelectionKey.OP_READ);
         }
 
 
         @Override
         public void registerWriteInterest() {
-            getPoller().add(getSocket(), SelectionKey.OP_WRITE);
+            getPoller().add(this, SelectionKey.OP_WRITE);
         }
 
 
@@ -1369,10 +1339,9 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
         @Override
         public SendfileState processSendfile(SendfileDataBase sendfileData) {
             setSendfileData((SendfileData) sendfileData);
-            SelectionKey key = getSocket().getIOChannel().keyFor(
-                    getSocket().getPoller().getSelector());
+            SelectionKey key = 
getSocket().getIOChannel().keyFor(getPoller().getSelector());
             // Might as well do the first write on this thread
-            return getSocket().getPoller().processSendfile(key, this, true);
+            return getPoller().processSendfile(key, this, true);
         }
 
 
@@ -1810,7 +1779,7 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
         @Override
         protected void doRun() {
             NioChannel socket = socketWrapper.getSocket();
-            SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+            SelectionKey key = 
socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
 
             try {
                 int handshake = -1;
@@ -1863,12 +1832,12 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                     socketWrapper.registerWriteInterest();
                 }
             } catch (CancelledKeyException cx) {
-                socket.getPoller().cancelledKey(key);
+                poller.cancelledKey(key);
             } catch (VirtualMachineError vme) {
                 ExceptionUtils.handleThrowable(vme);
             } catch (Throwable t) {
                 log.error(sm.getString("endpoint.processing.fail"), t);
-                socket.getPoller().cancelledKey(key);
+                poller.cancelledKey(key);
             } finally {
                 socketWrapper = null;
                 event = null;
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 1084c95..f81c36e 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -81,8 +81,12 @@
         certain microbenchmarks. (remm)
       </fix>
       <fix>
-        Avoid possible NPEs in on connector stop. (remm)
+        Avoid possible NPEs on connector stop. (remm)
       </fix>
+      <update>
+        Remove <code>pollerThreadCount</code> Connector attribute for NIO,
+        one poller thread is sufficient. (remm)
+      </update>
     </changelog>
   </subsection>
   <subsection name="Other">
diff --git a/webapps/docs/config/http.xml b/webapps/docs/config/http.xml
index d028861..ce1b6cf 100644
--- a/webapps/docs/config/http.xml
+++ b/webapps/docs/config/http.xml
@@ -725,18 +725,6 @@
 
     <attributes>
 
-      <attribute name="pollerThreadCount" required="false">
-        <p>(int)The number of threads to be used to run for the polling events.
-        Default value is <code>1</code>.<br/>
-        When accepting a socket, the operating system holds a global lock. So 
the benefit of
-        going above 2 threads diminishes rapidly. Having more than one thread 
is for
-        system that need to accept connections very rapidly. However usually 
just
-        increasing <code>acceptCount</code> will solve that problem.
-        Increasing this value may also be beneficial when a large amount of 
send file
-        operations are going on.
-        </p>
-      </attribute>
-
       <attribute name="pollerThreadPriority" required="false">
         <p>(int)The priority of the poller threads.
         The default value is <code>5</code> (the value of the


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to