Author: norman
Date: Tue Jun 28 10:13:13 2011
New Revision: 1140509

URL: http://svn.apache.org/viewvc?rev=1140509&view=rev
Log:
Use threadpools for mail spooling and also expose them via JMX. See JAMES-1283

Modified:
    james/server/trunk/mailetcontainer-library/pom.xml
    
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java
    
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
    
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
    
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java

Modified: james/server/trunk/mailetcontainer-library/pom.xml
URL: 
http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-library/pom.xml?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
--- james/server/trunk/mailetcontainer-library/pom.xml (original)
+++ james/server/trunk/mailetcontainer-library/pom.xml Tue Jun 28 10:13:13 2011
@@ -76,6 +76,10 @@ Parent pom build failure prevents inheri
     </dependency>
     <dependency>
       <groupId>org.apache.james</groupId>
+      <artifactId>james-server-util</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.james</groupId>
       <artifactId>apache-mailet</artifactId>
     </dependency>  
     <dependency>

Modified: 
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
--- 
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java
 (original)
+++ 
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java
 Tue Jun 28 10:13:13 2011
@@ -19,7 +19,7 @@
 
 package org.apache.james.mailetcontainer.lib;
 
-import java.util.Collection;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -35,8 +35,10 @@ import org.apache.james.lifecycle.api.Lo
 import org.apache.james.mailetcontainer.api.MailProcessor;
 import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
 import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueue.MailQueueException;
 import org.apache.james.queue.api.MailQueue.MailQueueItem;
 import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 
@@ -45,8 +47,6 @@ import org.slf4j.Logger;
  * from the spool, directing messages to the appropriate processor, and 
removing
  * them from the spool when processing is complete.
  * 
- * TODO: We should better use a ExecutorService here and only spawn a new 
Thread
- * if needed
  */
 public class JamesMailSpooler implements Runnable, Configurable, LogEnabled, 
MailSpoolerMBean {
 
@@ -64,8 +64,10 @@ public class JamesMailSpooler implements
     private AtomicBoolean active = new AtomicBoolean(false);
 
     /** Spool threads */
-    private Collection<Thread> spoolThreads;
-
+    private ExecutorService dequeueService;
+    
+    private ExecutorService workerService;
+    
     /** The mail processor */
     private MailProcessor mailProcessor;
 
@@ -73,6 +75,8 @@ public class JamesMailSpooler implements
 
     private MailQueueFactory queueFactory;
 
+    private int numDequeueThreads;
+
     @Resource(name = "mailqueuefactory")
     public void setMailQueueFactory(MailQueueFactory queueFactory) {
         this.queueFactory = queueFactory;
@@ -91,6 +95,8 @@ public class JamesMailSpooler implements
      * configuration.HierarchicalConfiguration)
      */
     public void configure(HierarchicalConfiguration config) throws 
ConfigurationException {
+        numDequeueThreads = config.getInt("dequeueThreads", 2);
+
         numThreads = config.getInt("threads", 100);
     }
 
@@ -109,11 +115,12 @@ public class JamesMailSpooler implements
         }
 
         active.set(true);
-        spoolThreads = new java.util.ArrayList<Thread>(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            Thread reader = new Thread(this, "Spool Thread #" + i);
-            spoolThreads.add(reader);
-            reader.start();
+        workerService = 
JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool",
 "spooler", numThreads);
+        dequeueService = 
JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool",
 "dequeuer", numDequeueThreads);
+        
+        for (int i = 0; i < numDequeueThreads; i++) {
+            Thread reader = new Thread(this, "Dequeue Thread #" + i);
+            dequeueService.execute(reader);
         }
     }
 
@@ -131,41 +138,57 @@ public class JamesMailSpooler implements
         while (active.get()) {
             numActive.incrementAndGet();
 
+            final MailQueueItem queueItem;
             try {
-                MailQueueItem queueItem = queue.deQueue();
+                queueItem = queue.deQueue();
+                workerService.execute(new Runnable() {
 
-                // increase count
-                processingActive.incrementAndGet();
-
-                Mail mail = queueItem.getMail();
-                if (logger.isDebugEnabled()) {
-                    StringBuffer debugBuffer = new 
StringBuffer(64).append("==== Begin processing mail 
").append(mail.getName()).append("====");
-                    logger.debug(debugBuffer.toString());
-                }
+                    @Override
+                    public void run() {
+                        try {
+                            // increase count
+                            processingActive.incrementAndGet();
+
+                            Mail mail = queueItem.getMail();
+                            if (logger.isDebugEnabled()) {
+                                StringBuffer debugBuffer = new 
StringBuffer(64).append("==== Begin processing mail 
").append(mail.getName()).append("====");
+                                logger.debug(debugBuffer.toString());
+                            }
+
+                            try {
+                                mailProcessor.service(mail);
+                                queueItem.done(true);
+                            } catch (Exception e) {
+                                if (active.get() && logger.isErrorEnabled()) {
+                                    logger.error("Exception processing mail 
while spooling " + e.getMessage(), e);
+                                }
+                                queueItem.done(false);
+
+                            } finally {
+                                LifecycleUtil.dispose(mail);
+                                mail = null;
+                            }
+                        } catch (Throwable e) {
+                            if (active.get() && logger.isErrorEnabled()) {
+                                logger.error("Exception processing mail while 
spooling " + e.getMessage(), e);
+
+                            }
+                        } finally {
+                            processingActive.decrementAndGet();
+                            numActive.decrementAndGet();
+                        }
 
-                try {
-                    mailProcessor.service(mail);
-                    queueItem.done(true);
-                } catch (Exception e) {
-                    if (active.get() && logger.isErrorEnabled()) {
-                        logger.error("Exception processing mail while spooling 
" + e.getMessage(), e);
                     }
-                    queueItem.done(false);
-
-                } finally {
-                    LifecycleUtil.dispose(mail);
-                    mail = null;
-                }
-
-            } catch (Throwable e) {
+                });
+                  
+               
+            } catch (MailQueueException e1) {
                 if (active.get() && logger.isErrorEnabled()) {
-                    logger.error("Exception processing mail while spooling " + 
e.getMessage(), e);
+                    logger.error("Exception dequeue mail", e1);
 
                 }
-            } finally {
-                processingActive.decrementAndGet();
-                numActive.decrementAndGet();
             }
+          
 
         }
         if (logger.isInfoEnabled()) {
@@ -187,10 +210,9 @@ public class JamesMailSpooler implements
     public void dispose() {
         logger.info(getClass().getName() + " dispose...");
         active.set(false); // shutdown the threads
-        for (Thread thread : spoolThreads) {
-            thread.interrupt(); // interrupt any waiting accept() calls.
-        }
-
+        dequeueService.shutdownNow();
+        workerService.shutdown();
+        
         long stop = System.currentTimeMillis() + 60000;
         // give the spooler threads one minute to terminate gracefully
         while (numActive.get() != 0 && stop > System.currentTimeMillis()) {

Modified: 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
--- 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
 (original)
+++ 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
 Tue Jun 28 10:13:13 2011
@@ -27,6 +27,11 @@ import java.util.concurrent.ScheduledThr
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+/**
+ * {@link ScheduledThreadPoolExecutor} which expose statistics via JMX
+ * 
+ *
+ */
 public class JMXEnabledScheduledThreadPoolExecutor extends 
ScheduledThreadPoolExecutor implements 
JMXEnabledScheduledThreadPoolExecutorMBean {
 
     private String jmxPath;
@@ -161,4 +166,12 @@ public class JMXEnabledScheduledThreadPo
     public int getQueuedTasks() {
         return getQueue().size();
     }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutorMBean#getMaximalThreads()
+     */
+    public int getMaximalThreads() {
+        return getMaximumPoolSize();
+    }
 }

Modified: 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
--- 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
 (original)
+++ 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
 Tue Jun 28 10:13:13 2011
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -160,6 +161,14 @@ public class JMXEnabledThreadPoolExecuto
         return getQueue().size();
     }
 
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutorMBean#getMaximalThreads()
+     */
+    public int getMaximalThreads() {
+        return getMaximumPoolSize();
+    }
+
     /**
      * Create a cached instance of this class. If jmxPath is null it will not
      * register itself to the {@link MBeanServer}
@@ -185,4 +194,12 @@ public class JMXEnabledThreadPoolExecuto
     public static JMXEnabledThreadPoolExecutor newCachedThreadPool(String 
jmxPath, NamedThreadFactory factory) {
         return new JMXEnabledThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, 
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory, jmxPath);
     }
+    
+    public static JMXEnabledThreadPoolExecutor newFixedThreadPool(String 
jmxPath, int nThreads, NamedThreadFactory threadFactory) {
+        return new JMXEnabledThreadPoolExecutor(nThreads, nThreads, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory, 
jmxPath);
+    }
+    
+    public static JMXEnabledThreadPoolExecutor newFixedThreadPool(String 
jmxPath, String name, int nThreads) {
+        return newFixedThreadPool(jmxPath, nThreads, new 
NamedThreadFactory(name));
+    }
 }

Modified: 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
--- 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java
 (original)
+++ 
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java
 Tue Jun 28 10:13:13 2011
@@ -54,4 +54,11 @@ public interface JMXEnabledThreadPoolExe
      * @return aTime
      */
     double getAverageTaskTime();
+    
+    /**
+     * Return the maximal allowed count of threads
+     * 
+     * @return maxThreads
+     */
+    int getMaximalThreads();
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to