Author: norman
Date: Tue Jun 28 10:13:13 2011
New Revision: 1140509
URL: http://svn.apache.org/viewvc?rev=1140509&view=rev
Log:
Use threadpools for mail spooling and also expose them via JMX. See JAMES-1283
Modified:
james/server/trunk/mailetcontainer-library/pom.xml
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java
Modified: james/server/trunk/mailetcontainer-library/pom.xml
URL:
http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-library/pom.xml?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
--- james/server/trunk/mailetcontainer-library/pom.xml (original)
+++ james/server/trunk/mailetcontainer-library/pom.xml Tue Jun 28 10:13:13 2011
@@ -76,6 +76,10 @@ Parent pom build failure prevents inheri
</dependency>
<dependency>
<groupId>org.apache.james</groupId>
+ <artifactId>james-server-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.james</groupId>
<artifactId>apache-mailet</artifactId>
</dependency>
<dependency>
Modified:
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
---
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java
(original)
+++
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/JamesMailSpooler.java
Tue Jun 28 10:13:13 2011
@@ -19,7 +19,7 @@
package org.apache.james.mailetcontainer.lib;
-import java.util.Collection;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,8 +35,10 @@ import org.apache.james.lifecycle.api.Lo
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueue.MailQueueException;
import org.apache.james.queue.api.MailQueue.MailQueueItem;
import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
@@ -45,8 +47,6 @@ import org.slf4j.Logger;
* from the spool, directing messages to the appropriate processor, and
removing
* them from the spool when processing is complete.
*
- * TODO: We should better use a ExecutorService here and only spawn a new
Thread
- * if needed
*/
public class JamesMailSpooler implements Runnable, Configurable, LogEnabled,
MailSpoolerMBean {
@@ -64,8 +64,10 @@ public class JamesMailSpooler implements
private AtomicBoolean active = new AtomicBoolean(false);
/** Spool threads */
- private Collection<Thread> spoolThreads;
-
+ private ExecutorService dequeueService;
+
+ private ExecutorService workerService;
+
/** The mail processor */
private MailProcessor mailProcessor;
@@ -73,6 +75,8 @@ public class JamesMailSpooler implements
private MailQueueFactory queueFactory;
+ private int numDequeueThreads;
+
@Resource(name = "mailqueuefactory")
public void setMailQueueFactory(MailQueueFactory queueFactory) {
this.queueFactory = queueFactory;
@@ -91,6 +95,8 @@ public class JamesMailSpooler implements
* configuration.HierarchicalConfiguration)
*/
public void configure(HierarchicalConfiguration config) throws
ConfigurationException {
+ numDequeueThreads = config.getInt("dequeueThreads", 2);
+
numThreads = config.getInt("threads", 100);
}
@@ -109,11 +115,12 @@ public class JamesMailSpooler implements
}
active.set(true);
- spoolThreads = new java.util.ArrayList<Thread>(numThreads);
- for (int i = 0; i < numThreads; i++) {
- Thread reader = new Thread(this, "Spool Thread #" + i);
- spoolThreads.add(reader);
- reader.start();
+ workerService =
JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool",
"spooler", numThreads);
+ dequeueService =
JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool",
"dequeuer", numDequeueThreads);
+
+ for (int i = 0; i < numDequeueThreads; i++) {
+ Thread reader = new Thread(this, "Dequeue Thread #" + i);
+ dequeueService.execute(reader);
}
}
@@ -131,41 +138,57 @@ public class JamesMailSpooler implements
while (active.get()) {
numActive.incrementAndGet();
+ final MailQueueItem queueItem;
try {
- MailQueueItem queueItem = queue.deQueue();
+ queueItem = queue.deQueue();
+ workerService.execute(new Runnable() {
- // increase count
- processingActive.incrementAndGet();
-
- Mail mail = queueItem.getMail();
- if (logger.isDebugEnabled()) {
- StringBuffer debugBuffer = new
StringBuffer(64).append("==== Begin processing mail
").append(mail.getName()).append("====");
- logger.debug(debugBuffer.toString());
- }
+ @Override
+ public void run() {
+ try {
+ // increase count
+ processingActive.incrementAndGet();
+
+ Mail mail = queueItem.getMail();
+ if (logger.isDebugEnabled()) {
+ StringBuffer debugBuffer = new
StringBuffer(64).append("==== Begin processing mail
").append(mail.getName()).append("====");
+ logger.debug(debugBuffer.toString());
+ }
+
+ try {
+ mailProcessor.service(mail);
+ queueItem.done(true);
+ } catch (Exception e) {
+ if (active.get() && logger.isErrorEnabled()) {
+ logger.error("Exception processing mail
while spooling " + e.getMessage(), e);
+ }
+ queueItem.done(false);
+
+ } finally {
+ LifecycleUtil.dispose(mail);
+ mail = null;
+ }
+ } catch (Throwable e) {
+ if (active.get() && logger.isErrorEnabled()) {
+ logger.error("Exception processing mail while
spooling " + e.getMessage(), e);
+
+ }
+ } finally {
+ processingActive.decrementAndGet();
+ numActive.decrementAndGet();
+ }
- try {
- mailProcessor.service(mail);
- queueItem.done(true);
- } catch (Exception e) {
- if (active.get() && logger.isErrorEnabled()) {
- logger.error("Exception processing mail while spooling
" + e.getMessage(), e);
}
- queueItem.done(false);
-
- } finally {
- LifecycleUtil.dispose(mail);
- mail = null;
- }
-
- } catch (Throwable e) {
+ });
+
+
+ } catch (MailQueueException e1) {
if (active.get() && logger.isErrorEnabled()) {
- logger.error("Exception processing mail while spooling " +
e.getMessage(), e);
+ logger.error("Exception dequeue mail", e1);
}
- } finally {
- processingActive.decrementAndGet();
- numActive.decrementAndGet();
}
+
}
if (logger.isInfoEnabled()) {
@@ -187,10 +210,9 @@ public class JamesMailSpooler implements
public void dispose() {
logger.info(getClass().getName() + " dispose...");
active.set(false); // shutdown the threads
- for (Thread thread : spoolThreads) {
- thread.interrupt(); // interrupt any waiting accept() calls.
- }
-
+ dequeueService.shutdownNow();
+ workerService.shutdown();
+
long stop = System.currentTimeMillis() + 60000;
// give the spooler threads one minute to terminate gracefully
while (numActive.get() != 0 && stop > System.currentTimeMillis()) {
Modified:
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
---
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
(original)
+++
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
Tue Jun 28 10:13:13 2011
@@ -27,6 +27,11 @@ import java.util.concurrent.ScheduledThr
import javax.management.MBeanServer;
import javax.management.ObjectName;
+/**
+ * {@link ScheduledThreadPoolExecutor} which expose statistics via JMX
+ *
+ *
+ */
public class JMXEnabledScheduledThreadPoolExecutor extends
ScheduledThreadPoolExecutor implements
JMXEnabledScheduledThreadPoolExecutorMBean {
private String jmxPath;
@@ -161,4 +166,12 @@ public class JMXEnabledScheduledThreadPo
public int getQueuedTasks() {
return getQueue().size();
}
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutorMBean#getMaximalThreads()
+ */
+ public int getMaximalThreads() {
+ return getMaximumPoolSize();
+ }
}
Modified:
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
---
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
(original)
+++
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
Tue Jun 28 10:13:13 2011
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -160,6 +161,14 @@ public class JMXEnabledThreadPoolExecuto
return getQueue().size();
}
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutorMBean#getMaximalThreads()
+ */
+ public int getMaximalThreads() {
+ return getMaximumPoolSize();
+ }
+
/**
* Create a cached instance of this class. If jmxPath is null it will not
* register itself to the {@link MBeanServer}
@@ -185,4 +194,12 @@ public class JMXEnabledThreadPoolExecuto
public static JMXEnabledThreadPoolExecutor newCachedThreadPool(String
jmxPath, NamedThreadFactory factory) {
return new JMXEnabledThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory, jmxPath);
}
+
+ public static JMXEnabledThreadPoolExecutor newFixedThreadPool(String
jmxPath, int nThreads, NamedThreadFactory threadFactory) {
+ return new JMXEnabledThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory,
jmxPath);
+ }
+
+ public static JMXEnabledThreadPoolExecutor newFixedThreadPool(String
jmxPath, String name, int nThreads) {
+ return newFixedThreadPool(jmxPath, nThreads, new
NamedThreadFactory(name));
+ }
}
Modified:
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java?rev=1140509&r1=1140508&r2=1140509&view=diff
==============================================================================
---
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java
(original)
+++
james/server/trunk/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutorMBean.java
Tue Jun 28 10:13:13 2011
@@ -54,4 +54,11 @@ public interface JMXEnabledThreadPoolExe
* @return aTime
*/
double getAverageTaskTime();
+
+ /**
+ * Return the maximal allowed count of threads
+ *
+ * @return maxThreads
+ */
+ int getMaximalThreads();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]