Author: fhanik
Date: Thu Apr 12 19:28:01 2007
New Revision: 528323

URL: http://svn.apache.org/viewvc?view=rev&rev=528323
Log:
Fix the handling of the Http11NioProcessors when the thread pool can be 
shrinking and growing.
So we are not associating the processor with a thread local, instead going 
directly to a pool of them


Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java?view=diff&rev=528323&r1=528322&r2=528323
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/RequestInfo.java Thu Apr 12 
19:28:01 2007
@@ -17,6 +17,8 @@
 
 package org.apache.coyote;
 
+import javax.management.ObjectName;
+
 
 /**
  * Structure holding the Request and Response objects. It also holds 
statistical
@@ -63,6 +65,7 @@
     Response res;
     int stage = Constants.STAGE_NEW;
     String workerThreadName;
+    ObjectName rpName;
 
     // -------------------- Information about the current request  -----------
     // This is usefull for long-running requests only
@@ -217,7 +220,15 @@
         return workerThreadName;
     }
 
+    public ObjectName getRpName() {
+        return rpName;
+    }
+
     public void setWorkerThreadName(String workerThreadName) {
         this.workerThreadName = workerThreadName;
+    }
+
+    public void setRpName(ObjectName rpName) {
+        this.rpName = rpName;
     }
 }

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=528323&r1=528322&r2=528323
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
Thu Apr 12 19:28:01 2007
@@ -753,16 +753,14 @@
         try {
             rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
             error = !adapter.event(request, response, status);
-            SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-            if ( key != null ) {
-                NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) 
key.attachment();
-                if ( attach!=null ) {
+            if ( !error ) {
+                NioEndpoint.KeyAttachment attach = 
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                if (attach != null) {
                     attach.setComet(comet);
-                    Integer comettimeout = 
(Integer)request.getAttribute("org.apache.tomcat.comet.timeout");
-                    if ( comettimeout != null ) 
attach.setTimeout(comettimeout.longValue());
+                    Integer comettimeout = (Integer) 
request.getAttribute("org.apache.tomcat.comet.timeout");
+                    if (comettimeout != null) 
attach.setTimeout(comettimeout.longValue());
                 }
             }
-            
         } catch (InterruptedIOException e) {
             error = true;
         } catch (Throwable t) {
@@ -1072,7 +1070,8 @@
                     //if this is a comet connection
                     //then execute the connection closure at the next selector 
loop
                     
request.getAttributes().remove("org.apache.tomcat.comet.timeout");
-                    attach.setError(true);
+                    //attach.setTimeout(5000); //force a cleanup in 5 seconds
+                    //attach.setError(true); //this has caused concurrency 
errors
                 }
             }
 

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?view=diff&rev=528323&r1=528322&r2=528323
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java 
Thu Apr 12 19:28:01 2007
@@ -22,7 +22,9 @@
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanRegistration;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -212,6 +214,7 @@
     private int timeout = 300000;   // 5 minutes as in Apache HTTPD server
     private int maxSavePostSize = 4 * 1024;
     private int maxHttpHeaderSize = 8 * 1024;
+    protected int processorCache = 200; //max number of Http11NioProcessor 
objects cached
     private int socketCloseDelay=-1;
     private boolean disableUploadTimeout = true;
     private int socketBuffer = 9000;
@@ -534,6 +537,10 @@
         setAttribute("timeout", "" + timeouts);
     }
 
+    public void setProcessorCache(int processorCache) {
+        this.processorCache = processorCache;
+    }
+
     public void setOomParachute(int oomParachute) {
         ep.setOomParachute(oomParachute);
         setAttribute("oomParachute",oomParachute);
@@ -580,12 +587,42 @@
         protected static int count = 0;
         protected RequestGroupInfo global = new RequestGroupInfo();
 
-        protected ThreadLocal<Http11NioProcessor> localProcessor = 
-            new ThreadLocal<Http11NioProcessor>();
         protected ConcurrentHashMap<NioChannel, Http11NioProcessor> 
connections =
             new ConcurrentHashMap<NioChannel, Http11NioProcessor>();
-        protected java.util.Stack<Http11NioProcessor> recycledProcessors = 
-            new java.util.Stack<Http11NioProcessor>();
+        protected ConcurrentLinkedQueue<Http11NioProcessor> recycledProcessors 
= new ConcurrentLinkedQueue<Http11NioProcessor>() {
+            protected AtomicInteger size = new AtomicInteger(0);
+            public boolean offer(Http11NioProcessor processor) {
+                boolean offer = proto.processorCache==-1?true:size.get() < 
proto.processorCache;
+                //avoid over growing our cache or add after we have stopped
+                boolean result = false;
+                if ( offer ) {
+                    result = super.offer(processor);
+                    if ( result ) {
+                        size.incrementAndGet();
+                    }
+                }
+                if (!result) deregister(processor);
+                return result;
+            }
+            
+            public Http11NioProcessor poll() {
+                Http11NioProcessor result = super.poll();
+                if ( result != null ) {
+                    size.decrementAndGet();
+                }
+                return result;
+            }
+            
+            public void clear() {
+                Http11NioProcessor next = poll();
+                while ( next != null ) {
+                    deregister(next);
+                    next = poll();
+                }
+                super.clear();
+                size.set(0);
+            }
+        };
 
         Http11ConnectionHandler(Http11NioProtocol proto) {
             this.proto = proto;
@@ -600,6 +637,7 @@
 
             SocketState state = SocketState.CLOSED; 
             if (result != null) {
+                if (log.isDebugEnabled()) 
log.debug("Http11NioProcessor.error="+result.error);
                 // Call the appropriate event
                 try {
                     state = result.event(status);
@@ -626,7 +664,9 @@
                 } finally {
                     if (state != SocketState.LONG) {
                         connections.remove(socket);
-                        recycledProcessors.push(result);
+                        recycledProcessors.offer(result);
+                    } else {
+                        if (log.isDebugEnabled()) log.debug("Keeping 
processor["+result);
                     }
                 }
             }
@@ -636,14 +676,8 @@
         public SocketState process(NioChannel socket) {
             Http11NioProcessor processor = null;
             try {
-                processor = (Http11NioProcessor) localProcessor.get();
                 if (processor == null) {
-                    synchronized (recycledProcessors) {
-                        if (!recycledProcessors.isEmpty()) {
-                            processor = recycledProcessors.pop();
-                            localProcessor.set(processor);
-                        }
-                    }
+                    processor = recycledProcessors.poll();
                 }
                 if (processor == null) {
                     processor = createProcessor();
@@ -669,9 +703,11 @@
                     // Associate the connection with the processor. The next 
request 
                     // processed by this thread will use either a new or a 
recycled
                     // processor.
+                    if (log.isDebugEnabled()) log.debug("Not recycling 
["+processor+"] 
Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet());
                     connections.put(socket, processor);
-                    localProcessor.set(null);
                     socket.getPoller().add(socket);
+                } else {
+                    recycledProcessors.offer(processor);
                 }
                 return state;
 
@@ -696,6 +732,7 @@
                 Http11NioProtocol.log.error
                     (sm.getString("http11protocol.proto.error"), e);
             }
+            recycledProcessors.offer(processor);
             return SocketState.CLOSED;
         }
 
@@ -717,24 +754,51 @@
             processor.setSocketBuffer(proto.socketBuffer);
             processor.setMaxSavePostSize(proto.maxSavePostSize);
             processor.setServer(proto.server);
-            localProcessor.set(processor);
+            register(processor);
+            return processor;
+        }
+        AtomicInteger registerCount = new AtomicInteger(0);
+        public void register(Http11NioProcessor processor) {
             if (proto.getDomain() != null) {
                 synchronized (this) {
                     try {
+                        registerCount.addAndGet(1);
+                        if (log.isDebugEnabled()) log.debug("Register 
["+processor+"] count="+registerCount.get());
                         RequestInfo rp = 
processor.getRequest().getRequestProcessor();
                         rp.setGlobalProcessor(global);
                         ObjectName rpName = new ObjectName
-                        (proto.getDomain() + ":type=RequestProcessor,worker="
-                                + proto.getName() + ",name=HttpRequest" + 
count++);
+                            (proto.getDomain() + 
":type=RequestProcessor,worker="
+                             + proto.getName() + ",name=HttpRequest" + 
count++);
                         Registry.getRegistry(null, null).registerComponent(rp, 
rpName, null);
+                        rp.setRpName(rpName);
                     } catch (Exception e) {
                         log.warn("Error registering request");
                     }
                 }
             }
-            return processor;
         }
+    
+        public void deregister(Http11NioProcessor processor) {
+            if (proto.getDomain() != null) {
+                synchronized (this) {
+                    try {
+                        registerCount.addAndGet(-1);
+                        if (log.isDebugEnabled()) log.debug("Deregister 
["+processor+"] count="+registerCount.get());
+                        RequestInfo rp = 
processor.getRequest().getRequestProcessor();
+                        rp.setGlobalProcessor(null);
+                        ObjectName rpName = rp.getRpName();
+                        Registry.getRegistry(null, 
null).unregisterComponent(rpName);
+                        rp.setRpName(null);
+                    } catch (Exception e) {
+                        log.warn("Error unregistering request", e);
+                    }
+                }
+            }
+        }
+
     }
+    
+
 
     protected static org.apache.juli.logging.Log log
         = org.apache.juli.logging.LogFactory.getLog(Http11NioProtocol.class);
@@ -751,6 +815,10 @@
 
     public String getDomain() {
         return domain;
+    }
+
+    public int getProcessorCache() {
+        return processorCache;
     }
 
     public int getOomParachute() {

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=528323&r1=528322&r2=528323
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu 
Apr 12 19:28:01 2007
@@ -341,7 +341,7 @@
     
     protected boolean useExecutor = true;
     public void setUseExecutor(boolean useexec) { useExecutor = useexec;}
-    public boolean getUseExecutor() { return useExecutor;}
+    public boolean getUseExecutor() { return useExecutor || (executor!=null);}
 
     /**
      * Maximum amount of worker threads.
@@ -1250,28 +1250,25 @@
             } else {
                 final SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                 try {
+                    boolean cancel = false;
                     if (key != null) {
                         final KeyAttachment att = (KeyAttachment) 
key.attachment();
-                        //we are registering the key to start with, reset the 
fairness counter.
-                        att.setFairness(0);
-                        key.interestOps(interestOps);
-                        att.interestOps(interestOps);
-                    }
-                }
-                catch (CancelledKeyException ckx) {
-                    try {
-                        if (key != null && key.attachment() != null) {
-                            KeyAttachment ka = (KeyAttachment) 
key.attachment();
-                            ka.setError(true); //set to collect this socket 
immediately
-                        }
-                        try {
-                            socket.close();
+                        if ( att!=null ) {
+                            //we are registering the key to start with, reset 
the fairness counter.
+                            att.setFairness(0);
+                            att.interestOps(interestOps);
+                            key.interestOps(interestOps);
+                        } else {
+                            cancel = true;
                         }
-                        catch (Exception ignore) {}
-                        if (socket.isOpen())
-                            socket.close(true);
+                    } else {
+                        cancel = true;
                     }
-                    catch (Exception ignore) {}
+                    if ( cancel ) 
getPoller0().cancelledKey(key,SocketStatus.ERROR,false);
+                }catch (CancelledKeyException ckx) {
+                    try {
+                        
getPoller0().cancelledKey(key,SocketStatus.DISCONNECT,true);
+                    }catch (Exception ignore) {}
                 }
             }//end if
         }//run
@@ -1391,12 +1388,15 @@
             try {
                 if ( key == null ) return;//nothing to do
                 KeyAttachment ka = (KeyAttachment) key.attachment();
-                if (ka != null && ka.getComet()) {
+                if (ka != null && ka.getComet() && status != null) {
                     //the comet event takes care of clean up
-                    processSocket(ka.getChannel(), status, dispatch);
+                    //processSocket(ka.getChannel(), status, dispatch);
+                    ka.setComet(false);//to avoid a loop
+                    processSocket(ka.getChannel(), status, false);//don't 
dispatch if the lines below are cancelling the key
                 }
                 if (key.isValid()) key.cancel();
                 if (key.channel().isOpen()) key.channel().close();
+                try {ka.channel.close(true);}catch (Exception ignore){}
                 key.attach(null);
             } catch (Throwable e) {
                 if ( log.isDebugEnabled() ) log.error("",e);
@@ -1521,8 +1521,7 @@
                                 unreg(sk, attachment);
                                 boolean close = (!processSocket(channel));
                                 if (close) {
-                                    channel.close();
-                                    channel.getIOChannel().socket().close();
+                                    
cancelledKey(sk,SocketStatus.DISCONNECT,false);
                                 }
                                 attachment.setFairness(0);
                             } else {
@@ -1601,14 +1600,16 @@
             nextExpiration = now + (long)socketProperties.getSoTimeout();
             //timeout
             Set<SelectionKey> keys = selector.keys();
+            int keycount = 0;
             for (Iterator<SelectionKey> iter = keys.iterator(); 
iter.hasNext(); ) {
                 SelectionKey key = iter.next();
+                keycount++;
                 try {
                     KeyAttachment ka = (KeyAttachment) key.attachment();
                     if ( ka == null ) {
                         cancelledKey(key, SocketStatus.ERROR); //we don't 
support any keys without attachments
                     } else if ( ka.getError() ) {
-                        cancelledKey(key, SocketStatus.DISCONNECT);
+                        cancelledKey(key, SocketStatus.ERROR);
                     }else if ((ka.interestOps()&SelectionKey.OP_READ) == 
SelectionKey.OP_READ) {
                         //only timeout sockets that we are waiting for a read 
from
                         long delta = now - ka.getLastAccess();
@@ -1631,6 +1632,7 @@
                     cancelledKey(key, SocketStatus.ERROR);
                 }
             }//for
+            if ( log.isDebugEnabled() ) log.debug("Poller processed 
"+keycount+" keys through timeout");
         }
     }
 
@@ -1873,10 +1875,7 @@
                                 // Close socket and pool
                                 try {
                                     KeyAttachment att = 
(KeyAttachment)socket.getAttachment(true);
-                                    try {socket.close();}catch (Exception 
ignore){}
-                                    if ( socket.isOpen() ) socket.close(true);
-                                    key.cancel();
-                                    key.attach(null);
+                                    
getPoller0().cancelledKey(key,SocketStatus.ERROR,false);
                                     nioChannels.offer(socket);
                                     if ( att!=null ) keyCache.offer(att);
                                 }catch ( Exception x ) {
@@ -1886,10 +1885,7 @@
                                 // Close socket and pool
                                 try {
                                     KeyAttachment att = 
(KeyAttachment)socket.getAttachment(true);
-                                    try {socket.close();}catch (Exception 
ignore){}
-                                    if ( socket.isOpen() ) socket.close(true);
-                                    key.cancel();
-                                    key.attach(null);
+                                    
getPoller0().cancelledKey(key,SocketStatus.ERROR,false);
                                     nioChannels.offer(socket);
                                     if ( att!=null ) keyCache.offer(att);
                                 }catch ( Exception x ) {
@@ -1898,7 +1894,6 @@
                             }
                         } else if (handshake == -1 ) {
                             
socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT);
-                            try {socket.close(true);}catch (IOException 
ignore){}
                             nioChannels.offer(socket);
                         } else {
                             final SelectionKey fk = key;
@@ -2080,13 +2075,14 @@
                     if (closed) {
                         // Close socket and pool
                         try {
-                            KeyAttachment att = 
(KeyAttachment)socket.getAttachment(true);
-                            try {socket.close();}catch (Exception ignore){}
-                            if ( socket.isOpen() ) socket.close(true);
-                            key.cancel();
-                            key.attach(null);
+                            KeyAttachment ka = null;
+                            if (key!=null) {
+                                ka = (KeyAttachment) key.attachment();
+                                ka.setComet(false);
+                                socket.getPoller().cancelledKey(key, 
SocketStatus.DISCONNECT, false);
+                            }
                             nioChannels.offer(socket);
-                            if ( att!=null ) keyCache.offer(att);
+                            if ( ka!=null ) keyCache.offer(ka);
                         }catch ( Exception x ) {
                             log.error("",x);
                         }
@@ -2097,7 +2093,6 @@
                         ka = (KeyAttachment) key.attachment();
                         socket.getPoller().cancelledKey(key, 
SocketStatus.DISCONNECT, false);
                     }
-                    try {socket.close(true);}catch (IOException ignore){}
                     nioChannels.offer(socket);
                     if ( ka!=null ) keyCache.offer(ka);
                 } else {

Modified: tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml?view=diff&rev=528323&r1=528322&r2=528323
==============================================================================
--- tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml (original)
+++ tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml Thu Apr 12 19:28:01 2007
@@ -450,6 +450,12 @@
       <attribute name="useComet" required="false">
         <p>(bool)Whether to allow comet servlets or not, Default value is 
true.</p>
       </attribute>
+      <attribute name="processCache" required="false">
+        <p>(int)The protocol handler caches Http11NioProcessor objects to 
speed up performance.
+           This setting dictates how many of these objects get cached.
+           -1 means unlimited, default is 200. Set this value somewhere close 
to your maxThreads value.
+        </p>
+      </attribute>
       <attribute name="socket.directBuffer" required="false">
         <p>(bool)Boolean value, whether to use direct ByteBuffers or java 
mapped ByteBuffers. Default is <code>false</code>
            <br/>When you are using direct buffers, make sure you allocate the 
appropriate amount of memory for the 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to