Author: fhanik Date: Thu Apr 12 19:28:01 2007 New Revision: 528323 URL: http://svn.apache.org/viewvc?view=rev&rev=528323 Log: Fix the handling of the Http11NioProcessors when the thread pool can be shrinking and growing. So we are not associating the processor with a thread local, instead going directly to a pool of them
Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java?view=diff&rev=528323&r1=528322&r2=528323 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java Thu Apr 12 19:28:01 2007 @@ -17,6 +17,8 @@ package org.apache.coyote; +import javax.management.ObjectName; + /** * Structure holding the Request and Response objects. It also holds statistical @@ -63,6 +65,7 @@ Response res; int stage = Constants.STAGE_NEW; String workerThreadName; + ObjectName rpName; // -------------------- Information about the current request ----------- // This is usefull for long-running requests only @@ -217,7 +220,15 @@ return workerThreadName; } + public ObjectName getRpName() { + return rpName; + } + public void setWorkerThreadName(String workerThreadName) { this.workerThreadName = workerThreadName; + } + + public void setRpName(ObjectName rpName) { + this.rpName = rpName; } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=528323&r1=528322&r2=528323 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Thu Apr 12 19:28:01 2007 @@ -753,16 +753,14 @@ try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); error = !adapter.event(request, response, status); - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - if ( key != null ) { - NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); - if ( attach!=null ) { + if ( !error ) { + NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if (attach != null) { attach.setComet(comet); - Integer comettimeout = (Integer)request.getAttribute("org.apache.tomcat.comet.timeout"); - if ( comettimeout != null ) attach.setTimeout(comettimeout.longValue()); + Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout"); + if (comettimeout != null) attach.setTimeout(comettimeout.longValue()); } } - } catch (InterruptedIOException e) { error = true; } catch (Throwable t) { @@ -1072,7 +1070,8 @@ //if this is a comet connection //then execute the connection closure at the next selector loop request.getAttributes().remove("org.apache.tomcat.comet.timeout"); - attach.setError(true); + //attach.setTimeout(5000); //force a cleanup in 5 seconds + //attach.setError(true); //this has caused concurrency errors } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?view=diff&rev=528323&r1=528322&r2=528323 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java Thu Apr 12 19:28:01 2007 @@ -22,7 +22,9 @@ import java.util.Hashtable; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanRegistration; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -212,6 +214,7 @@ private int timeout = 300000; // 5 minutes as in Apache HTTPD server private int maxSavePostSize = 4 * 1024; private int maxHttpHeaderSize = 8 * 1024; + protected int processorCache = 200; //max number of Http11NioProcessor objects cached private int socketCloseDelay=-1; private boolean disableUploadTimeout = true; private int socketBuffer = 9000; @@ -534,6 +537,10 @@ setAttribute("timeout", "" + timeouts); } + public void setProcessorCache(int processorCache) { + this.processorCache = processorCache; + } + public void setOomParachute(int oomParachute) { ep.setOomParachute(oomParachute); setAttribute("oomParachute",oomParachute); @@ -580,12 +587,42 @@ protected static int count = 0; protected RequestGroupInfo global = new RequestGroupInfo(); - protected ThreadLocal<Http11NioProcessor> localProcessor = - new ThreadLocal<Http11NioProcessor>(); protected ConcurrentHashMap<NioChannel, Http11NioProcessor> connections = new ConcurrentHashMap<NioChannel, Http11NioProcessor>(); - protected java.util.Stack<Http11NioProcessor> recycledProcessors = - new java.util.Stack<Http11NioProcessor>(); + protected ConcurrentLinkedQueue<Http11NioProcessor> recycledProcessors = new ConcurrentLinkedQueue<Http11NioProcessor>() { + protected AtomicInteger size = new AtomicInteger(0); + public boolean offer(Http11NioProcessor processor) { + boolean offer = proto.processorCache==-1?true:size.get() < proto.processorCache; + //avoid over growing our cache or add after we have stopped + boolean result = false; + if ( offer ) { + result = super.offer(processor); + if ( result ) { + size.incrementAndGet(); + } + } + if (!result) deregister(processor); + return result; + } + + public Http11NioProcessor poll() { + Http11NioProcessor result = super.poll(); + if ( result != null ) { + size.decrementAndGet(); + } + return result; + } + + public void clear() { + Http11NioProcessor next = poll(); + while ( next != null ) { + deregister(next); + next = poll(); + } + super.clear(); + size.set(0); + } + }; Http11ConnectionHandler(Http11NioProtocol proto) { this.proto = proto; @@ -600,6 +637,7 @@ SocketState state = SocketState.CLOSED; if (result != null) { + if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error); // Call the appropriate event try { state = result.event(status); @@ -626,7 +664,9 @@ } finally { if (state != SocketState.LONG) { connections.remove(socket); - recycledProcessors.push(result); + recycledProcessors.offer(result); + } else { + if (log.isDebugEnabled()) log.debug("Keeping processor["+result); } } } @@ -636,14 +676,8 @@ public SocketState process(NioChannel socket) { Http11NioProcessor processor = null; try { - processor = (Http11NioProcessor) localProcessor.get(); if (processor == null) { - synchronized (recycledProcessors) { - if (!recycledProcessors.isEmpty()) { - processor = recycledProcessors.pop(); - localProcessor.set(processor); - } - } + processor = recycledProcessors.poll(); } if (processor == null) { processor = createProcessor(); @@ -669,9 +703,11 @@ // Associate the connection with the processor. The next request // processed by this thread will use either a new or a recycled // processor. + if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet()); connections.put(socket, processor); - localProcessor.set(null); socket.getPoller().add(socket); + } else { + recycledProcessors.offer(processor); } return state; @@ -696,6 +732,7 @@ Http11NioProtocol.log.error (sm.getString("http11protocol.proto.error"), e); } + recycledProcessors.offer(processor); return SocketState.CLOSED; } @@ -717,24 +754,51 @@ processor.setSocketBuffer(proto.socketBuffer); processor.setMaxSavePostSize(proto.maxSavePostSize); processor.setServer(proto.server); - localProcessor.set(processor); + register(processor); + return processor; + } + AtomicInteger registerCount = new AtomicInteger(0); + public void register(Http11NioProcessor processor) { if (proto.getDomain() != null) { synchronized (this) { try { + registerCount.addAndGet(1); + if (log.isDebugEnabled()) log.debug("Register ["+processor+"] count="+registerCount.get()); RequestInfo rp = processor.getRequest().getRequestProcessor(); rp.setGlobalProcessor(global); ObjectName rpName = new ObjectName - (proto.getDomain() + ":type=RequestProcessor,worker=" - + proto.getName() + ",name=HttpRequest" + count++); + (proto.getDomain() + ":type=RequestProcessor,worker=" + + proto.getName() + ",name=HttpRequest" + count++); Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + rp.setRpName(rpName); } catch (Exception e) { log.warn("Error registering request"); } } } - return processor; } + + public void deregister(Http11NioProcessor processor) { + if (proto.getDomain() != null) { + synchronized (this) { + try { + registerCount.addAndGet(-1); + if (log.isDebugEnabled()) log.debug("Deregister ["+processor+"] count="+registerCount.get()); + RequestInfo rp = processor.getRequest().getRequestProcessor(); + rp.setGlobalProcessor(null); + ObjectName rpName = rp.getRpName(); + Registry.getRegistry(null, null).unregisterComponent(rpName); + rp.setRpName(null); + } catch (Exception e) { + log.warn("Error unregistering request", e); + } + } + } + } + } + + protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(Http11NioProtocol.class); @@ -751,6 +815,10 @@ public String getDomain() { return domain; + } + + public int getProcessorCache() { + return processorCache; } public int getOomParachute() { Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=528323&r1=528322&r2=528323 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Apr 12 19:28:01 2007 @@ -341,7 +341,7 @@ protected boolean useExecutor = true; public void setUseExecutor(boolean useexec) { useExecutor = useexec;} - public boolean getUseExecutor() { return useExecutor;} + public boolean getUseExecutor() { return useExecutor || (executor!=null);} /** * Maximum amount of worker threads. @@ -1250,28 +1250,25 @@ } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { + boolean cancel = false; if (key != null) { final KeyAttachment att = (KeyAttachment) key.attachment(); - //we are registering the key to start with, reset the fairness counter. - att.setFairness(0); - key.interestOps(interestOps); - att.interestOps(interestOps); - } - } - catch (CancelledKeyException ckx) { - try { - if (key != null && key.attachment() != null) { - KeyAttachment ka = (KeyAttachment) key.attachment(); - ka.setError(true); //set to collect this socket immediately - } - try { - socket.close(); + if ( att!=null ) { + //we are registering the key to start with, reset the fairness counter. + att.setFairness(0); + att.interestOps(interestOps); + key.interestOps(interestOps); + } else { + cancel = true; } - catch (Exception ignore) {} - if (socket.isOpen()) - socket.close(true); + } else { + cancel = true; } - catch (Exception ignore) {} + if ( cancel ) getPoller0().cancelledKey(key,SocketStatus.ERROR,false); + }catch (CancelledKeyException ckx) { + try { + getPoller0().cancelledKey(key,SocketStatus.DISCONNECT,true); + }catch (Exception ignore) {} } }//end if }//run @@ -1391,12 +1388,15 @@ try { if ( key == null ) return;//nothing to do KeyAttachment ka = (KeyAttachment) key.attachment(); - if (ka != null && ka.getComet()) { + if (ka != null && ka.getComet() && status != null) { //the comet event takes care of clean up - processSocket(ka.getChannel(), status, dispatch); + //processSocket(ka.getChannel(), status, dispatch); + ka.setComet(false);//to avoid a loop + processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key } if (key.isValid()) key.cancel(); if (key.channel().isOpen()) key.channel().close(); + try {ka.channel.close(true);}catch (Exception ignore){} key.attach(null); } catch (Throwable e) { if ( log.isDebugEnabled() ) log.error("",e); @@ -1521,8 +1521,7 @@ unreg(sk, attachment); boolean close = (!processSocket(channel)); if (close) { - channel.close(); - channel.getIOChannel().socket().close(); + cancelledKey(sk,SocketStatus.DISCONNECT,false); } attachment.setFairness(0); } else { @@ -1601,14 +1600,16 @@ nextExpiration = now + (long)socketProperties.getSoTimeout(); //timeout Set<SelectionKey> keys = selector.keys(); + int keycount = 0; for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key = iter.next(); + keycount++; try { KeyAttachment ka = (KeyAttachment) key.attachment(); if ( ka == null ) { cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments } else if ( ka.getError() ) { - cancelledKey(key, SocketStatus.DISCONNECT); + cancelledKey(key, SocketStatus.ERROR); }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { //only timeout sockets that we are waiting for a read from long delta = now - ka.getLastAccess(); @@ -1631,6 +1632,7 @@ cancelledKey(key, SocketStatus.ERROR); } }//for + if ( log.isDebugEnabled() ) log.debug("Poller processed "+keycount+" keys through timeout"); } } @@ -1873,10 +1875,7 @@ // Close socket and pool try { KeyAttachment att = (KeyAttachment)socket.getAttachment(true); - try {socket.close();}catch (Exception ignore){} - if ( socket.isOpen() ) socket.close(true); - key.cancel(); - key.attach(null); + getPoller0().cancelledKey(key,SocketStatus.ERROR,false); nioChannels.offer(socket); if ( att!=null ) keyCache.offer(att); }catch ( Exception x ) { @@ -1886,10 +1885,7 @@ // Close socket and pool try { KeyAttachment att = (KeyAttachment)socket.getAttachment(true); - try {socket.close();}catch (Exception ignore){} - if ( socket.isOpen() ) socket.close(true); - key.cancel(); - key.attach(null); + getPoller0().cancelledKey(key,SocketStatus.ERROR,false); nioChannels.offer(socket); if ( att!=null ) keyCache.offer(att); }catch ( Exception x ) { @@ -1898,7 +1894,6 @@ } } else if (handshake == -1 ) { socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT); - try {socket.close(true);}catch (IOException ignore){} nioChannels.offer(socket); } else { final SelectionKey fk = key; @@ -2080,13 +2075,14 @@ if (closed) { // Close socket and pool try { - KeyAttachment att = (KeyAttachment)socket.getAttachment(true); - try {socket.close();}catch (Exception ignore){} - if ( socket.isOpen() ) socket.close(true); - key.cancel(); - key.attach(null); + KeyAttachment ka = null; + if (key!=null) { + ka = (KeyAttachment) key.attachment(); + ka.setComet(false); + socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false); + } nioChannels.offer(socket); - if ( att!=null ) keyCache.offer(att); + if ( ka!=null ) keyCache.offer(ka); }catch ( Exception x ) { log.error("",x); } @@ -2097,7 +2093,6 @@ ka = (KeyAttachment) key.attachment(); socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false); } - try {socket.close(true);}catch (IOException ignore){} nioChannels.offer(socket); if ( ka!=null ) keyCache.offer(ka); } else { Modified: tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml?view=diff&rev=528323&r1=528322&r2=528323 ============================================================================== --- tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml (original) +++ tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml Thu Apr 12 19:28:01 2007 @@ -450,6 +450,12 @@ <attribute name="useComet" required="false"> <p>(bool)Whether to allow comet servlets or not, Default value is true.</p> </attribute> + <attribute name="processCache" required="false"> + <p>(int)The protocol handler caches Http11NioProcessor objects to speed up performance. + This setting dictates how many of these objects get cached. + -1 means unlimited, default is 200. Set this value somewhere close to your maxThreads value. + </p> + </attribute> <attribute name="socket.directBuffer" required="false"> <p>(bool)Boolean value, whether to use direct ByteBuffers or java mapped ByteBuffers. Default is <code>false</code> <br/>When you are using direct buffers, make sure you allocate the appropriate amount of memory for the --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]