This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push: new e813ae0 Remove poller thread count from NIO connector e813ae0 is described below commit e813ae0d9329ebf4b95c02043c39c676edb47d3c Author: remm <r...@apache.org> AuthorDate: Mon May 13 14:40:36 2019 +0200 Remove poller thread count from NIO connector Simplify code when possible. As the poller is set for the connector, onlythe NioChannel and NioSocketWrapper have a dynamic association. I will close PR163. --- .../apache/coyote/http11/Http11NioProtocol.java | 3 +- .../tomcat/util/net/NioBlockingSelector.java | 4 +- java/org/apache/tomcat/util/net/NioChannel.java | 32 ++--- java/org/apache/tomcat/util/net/NioEndpoint.java | 145 ++++++++------------- webapps/docs/changelog.xml | 6 +- webapps/docs/config/http.xml | 12 -- 6 files changed, 75 insertions(+), 127 deletions(-) diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java b/java/org/apache/coyote/http11/Http11NioProtocol.java index e79390b..e27bc89 100644 --- a/java/org/apache/coyote/http11/Http11NioProtocol.java +++ b/java/org/apache/coyote/http11/Http11NioProtocol.java @@ -47,11 +47,10 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> { // -------------------- Pool setup -------------------- public void setPollerThreadCount(int count) { - ((NioEndpoint)getEndpoint()).setPollerThreadCount(count); } public int getPollerThreadCount() { - return ((NioEndpoint)getEndpoint()).getPollerThreadCount(); + return 1; } public void setSelectorTimeout(long timeout) { diff --git a/java/org/apache/tomcat/util/net/NioBlockingSelector.java b/java/org/apache/tomcat/util/net/NioBlockingSelector.java index d723c7a..eb8d511 100644 --- a/java/org/apache/tomcat/util/net/NioBlockingSelector.java +++ b/java/org/apache/tomcat/util/net/NioBlockingSelector.java @@ -82,7 +82,7 @@ public class NioBlockingSelector { */ public int write(ByteBuffer buf, NioChannel socket, long writeTimeout) throws IOException { - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector()); if (key == null) { throw new IOException(sm.getString("nioBlockingSelector.keyNotRegistered")); } @@ -158,7 +158,7 @@ public class NioBlockingSelector { * @throws IOException if an IO Exception occurs in the underlying socket logic */ public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException { - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector()); if (key == null) { throw new IOException(sm.getString("nioBlockingSelector.keyNotRegistered")); } diff --git a/java/org/apache/tomcat/util/net/NioChannel.java b/java/org/apache/tomcat/util/net/NioChannel.java index 4bc865c..01222e6 100644 --- a/java/org/apache/tomcat/util/net/NioChannel.java +++ b/java/org/apache/tomcat/util/net/NioChannel.java @@ -21,11 +21,10 @@ import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; -import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import org.apache.tomcat.util.net.NioEndpoint.Poller; +import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper; import org.apache.tomcat.util.res.StringManager; /** @@ -42,12 +41,10 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering protected static final ByteBuffer emptyBuf = ByteBuffer.allocate(0); protected SocketChannel sc = null; - protected SocketWrapperBase<NioChannel> socketWrapper = null; + protected NioSocketWrapper socketWrapper = null; protected final SocketBufferHandler bufHandler; - protected Poller poller; - public NioChannel(SocketChannel channel, SocketBufferHandler bufHandler) { this.sc = channel; this.bufHandler = bufHandler; @@ -63,11 +60,18 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering } - void setSocketWrapper(SocketWrapperBase<NioChannel> socketWrapper) { + void setSocketWrapper(NioSocketWrapper socketWrapper) { this.socketWrapper = socketWrapper; } /** + * @return the socketWrapper + */ + NioSocketWrapper getSocketWrapper() { + return socketWrapper; + } + + /** * Free the channel memory */ public void free() { @@ -172,22 +176,10 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering return sc.read(dsts, offset, length); } - public Object getAttachment() { - Poller pol = getPoller(); - Selector sel = pol!=null?pol.getSelector():null; - SelectionKey key = sel!=null?getIOChannel().keyFor(sel):null; - Object att = key!=null?key.attachment():null; - return att; - } - public SocketBufferHandler getBufHandler() { return bufHandler; } - public Poller getPoller() { - return poller; - } - public SocketChannel getIOChannel() { return sc; } @@ -213,10 +205,6 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering return 0; } - public void setPoller(Poller poller) { - this.poller = poller; - } - public void setIOChannel(SocketChannel IOChannel) { this.sc = IOChannel; } diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 1dac62f..0831273 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -44,7 +44,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.net.ssl.SSLEngine; @@ -147,34 +146,32 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> /** - * Poller thread count. + * NO-OP. + * + * @param pollerThreadCount Unused + * + * @deprecated Will be removed in Tomcat 10. + */ + @Deprecated + public void setPollerThreadCount(int pollerThreadCount) { } + /** + * Always returns 1. + * + * @return Always 1. + * + * @deprecated Will be removed in Tomcat 10. */ - private int pollerThreadCount = 1; - public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } - public int getPollerThreadCount() { return pollerThreadCount; } + @Deprecated + public int getPollerThreadCount() { return 1; } private long selectorTimeout = 1000; public void setSelectorTimeout(long timeout) { this.selectorTimeout = timeout;} public long getSelectorTimeout() { return this.selectorTimeout; } /** - * The socket pollers. + * The socket poller. */ - private Poller[] pollers = null; - private AtomicInteger pollerRotater = new AtomicInteger(0); - /** - * Return an available poller in true round robin fashion. - * - * @return The next poller in sequence - */ - public Poller getPoller0() { - if (pollerThreadCount == 1) { - return pollers[0]; - } else { - int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; - return pollers[idx]; - } - } + private Poller poller = null; public void setSelectorPool(NioSelectorPool selectorPool) { @@ -200,14 +197,10 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> * for the next request to be received on the socket */ public int getKeepAliveCount() { - if (pollers == null) { + if (poller == null) { return 0; } else { - int sum = 0; - for (int i = 0; i < pollers.length; i++) { - sum += pollers[i].getKeyCount(); - } - return sum; + return poller.getKeyCount(); } } @@ -221,16 +214,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> public void bind() throws Exception { initServerSocket(); - // Initialize thread count defaults for acceptor, poller - if (acceptorThreadCount == 0) { - // FIXME: Doesn't seem to work that well with multiple accept threads - acceptorThreadCount = 1; - } - if (pollerThreadCount <= 0) { - //minimum one poller thread - pollerThreadCount = 1; - } - setStopLatch(new CountDownLatch(pollerThreadCount)); + setStopLatch(new CountDownLatch(1)); // Initialize SSL if needed initialiseSsl(); @@ -290,15 +274,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> initializeConnectionLatch(); - // Start poller threads - pollers = new Poller[getPollerThreadCount()]; - for (int i = 0; i < pollers.length; i++) { - pollers[i] = new Poller(); - Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i); - pollerThread.setPriority(threadPriority); - pollerThread.setDaemon(true); - pollerThread.start(); - } + // Start poller thread + poller = new Poller(); + Thread pollerThread = new Thread(poller, getName() + "-ClientPoller"); + pollerThread.setPriority(threadPriority); + pollerThread.setDaemon(true); + pollerThread.start(); startAcceptorThread(); } @@ -315,12 +296,9 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } if (running) { running = false; - for (int i = 0; pollers != null && i < pollers.length; i++) { - if (pollers[i] == null) { - continue; - } - pollers[i].destroy(); - pollers[i] = null; + if (poller != null) { + poller.destroy(); + poller = null; } try { if (!getStopLatch().await(selectorTimeout + 100, TimeUnit.MILLISECONDS)) { @@ -433,7 +411,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> channel.setIOChannel(socket); channel.reset(); } - getPoller0().register(channel); + poller.register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { @@ -495,7 +473,8 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> private void close(NioChannel socket, SelectionKey key) { try { - if (socket.getPoller().cancelledKey(key) != null) { + Poller poller = this.poller; + if (poller != null && poller.cancelledKey(key) != null) { // SocketWrapper (attachment) was removed from the // key - recycle the key. This can only happen once // per attempted closure so it is used to determine @@ -525,33 +504,30 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> private NioChannel socket; private int interestOps; - private NioSocketWrapper socketWrapper; - public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) { - reset(ch, w, intOps); + public PollerEvent(NioChannel ch, int intOps) { + reset(ch, intOps); } - public void reset(NioChannel ch, NioSocketWrapper w, int intOps) { + public void reset(NioChannel ch, int intOps) { socket = ch; interestOps = intOps; - socketWrapper = w; } public void reset() { - reset(null, null, 0); + reset(null, 0); } @Override public void run() { if (interestOps == OP_REGISTER) { try { - socket.getIOChannel().register( - socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); + socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper()); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { - final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + final SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector()); try { if (key == null) { // The key was cancelled (e.g. due to socket closure) @@ -571,12 +547,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> socketWrapper.interestOps(ops); key.interestOps(ops); } else { - socket.getPoller().cancelledKey(key); + socket.getSocketWrapper().getPoller().cancelledKey(key); } } } catch (CancelledKeyException ckx) { try { - socket.getPoller().cancelledKey(key); + socket.getSocketWrapper().getPoller().cancelledKey(key); } catch (Exception ignore) {} } } @@ -584,7 +560,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> @Override public String toString() { - return "Poller event: socket [" + socket + "], socketWrapper [" + socketWrapper + + return "Poller event: socket [" + socket + "], socketWrapper [" + socket.getSocketWrapper() + "], interestOps [" + interestOps + "]"; } } @@ -638,23 +614,22 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> * of time equal to pollTime (in most cases, latency will be much lower, * however). * - * @param socket to add to the poller + * @param socketWrapper to add to the poller * @param interestOps Operations for which to register this socket with * the Poller */ - public void add(final NioChannel socket, final int interestOps) { + public void add(NioSocketWrapper socketWrapper, int interestOps) { PollerEvent r = null; if (eventCache != null) { r = eventCache.pop(); } if (r == null) { - r = new PollerEvent(socket, null, interestOps); + r = new PollerEvent(socketWrapper.getSocket(), interestOps); } else { - r.reset(socket, null, interestOps); + r.reset(socketWrapper.getSocket(), interestOps); } addEvent(r); if (close) { - NioSocketWrapper socketWrapper = (NioSocketWrapper) socket.getAttachment(); processSocket(socketWrapper, SocketEvent.STOP, false); } } @@ -691,7 +666,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> * @param socket The newly created socket */ public void register(final NioChannel socket) { - socket.setPoller(this); NioSocketWrapper socketWrapper = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(socketWrapper); socketWrapper.setPoller(this); @@ -705,9 +679,9 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> r = eventCache.pop(); } if (r == null) { - r = new PollerEvent(socket, socketWrapper, OP_REGISTER); + r = new PollerEvent(socket, OP_REGISTER); } else { - r.reset(socket, socketWrapper, OP_REGISTER); + r.reset(socket, OP_REGISTER); } addEvent(r); } @@ -943,7 +917,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> log.debug("OP_WRITE for sendfile: " + sd.fileName); } if (calledByProcessor) { - add(socketWrapper.getSocket(), SelectionKey.OP_WRITE); + add(socketWrapper, SelectionKey.OP_WRITE); } else { reg(sk, socketWrapper, SelectionKey.OP_WRITE); } @@ -1290,11 +1264,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> // Ignore } try { - NioSocketWrapper socketWrapper = (NioSocketWrapper) channel.getAttachment(); - if (socketWrapper == null) { - throw new IOException(sm.getString("endpoint.nio.keyMustBeCancelled")); - } - nRead = pool.read(to, channel, selector, socketWrapper.getReadTimeout()); + nRead = pool.read(to, channel, selector, getReadTimeout()); } finally { if (selector != null) { pool.put(selector); @@ -1350,13 +1320,13 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> @Override public void registerReadInterest() { - getPoller().add(getSocket(), SelectionKey.OP_READ); + getPoller().add(this, SelectionKey.OP_READ); } @Override public void registerWriteInterest() { - getPoller().add(getSocket(), SelectionKey.OP_WRITE); + getPoller().add(this, SelectionKey.OP_WRITE); } @@ -1369,10 +1339,9 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> @Override public SendfileState processSendfile(SendfileDataBase sendfileData) { setSendfileData((SendfileData) sendfileData); - SelectionKey key = getSocket().getIOChannel().keyFor( - getSocket().getPoller().getSelector()); + SelectionKey key = getSocket().getIOChannel().keyFor(getPoller().getSelector()); // Might as well do the first write on this thread - return getSocket().getPoller().processSendfile(key, this, true); + return getPoller().processSendfile(key, this, true); } @@ -1810,7 +1779,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> @Override protected void doRun() { NioChannel socket = socketWrapper.getSocket(); - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector()); try { int handshake = -1; @@ -1863,12 +1832,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { - socket.getPoller().cancelledKey(key); + poller.cancelledKey(key); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error(sm.getString("endpoint.processing.fail"), t); - socket.getPoller().cancelledKey(key); + poller.cancelledKey(key); } finally { socketWrapper = null; event = null; diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 1084c95..f81c36e 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -81,8 +81,12 @@ certain microbenchmarks. (remm) </fix> <fix> - Avoid possible NPEs in on connector stop. (remm) + Avoid possible NPEs on connector stop. (remm) </fix> + <update> + Remove <code>pollerThreadCount</code> Connector attribute for NIO, + one poller thread is sufficient. (remm) + </update> </changelog> </subsection> <subsection name="Other"> diff --git a/webapps/docs/config/http.xml b/webapps/docs/config/http.xml index d028861..ce1b6cf 100644 --- a/webapps/docs/config/http.xml +++ b/webapps/docs/config/http.xml @@ -725,18 +725,6 @@ <attributes> - <attribute name="pollerThreadCount" required="false"> - <p>(int)The number of threads to be used to run for the polling events. - Default value is <code>1</code>.<br/> - When accepting a socket, the operating system holds a global lock. So the benefit of - going above 2 threads diminishes rapidly. Having more than one thread is for - system that need to accept connections very rapidly. However usually just - increasing <code>acceptCount</code> will solve that problem. - Increasing this value may also be beneficial when a large amount of send file - operations are going on. - </p> - </attribute> - <attribute name="pollerThreadPriority" required="false"> <p>(int)The priority of the poller threads. The default value is <code>5</code> (the value of the --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org