Author: norman
Date: Thu Nov 11 20:48:03 2010
New Revision: 1034112
URL: http://svn.apache.org/viewvc?rev=1034112&view=rev
Log:
Expose JMX management for MailQueue
Added:
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1034112&r1=1034111&r2=1034112&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
Thu Nov 11 20:48:03 2010
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import javax.jms.Connection;
@@ -37,15 +39,18 @@ import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.SharedInputStream;
+import javax.management.NotCompliantMBeanException;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
+import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.commons.logging.Log;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.core.MimeMessageInputStream;
import org.apache.james.core.MimeMessageInputStreamSource;
import org.apache.james.core.MimeMessageSource;
import org.apache.james.core.MimeMessageWrapper;
+import org.apache.james.lifecycle.Disposable;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.jms.JMSMailQueue;
import org.apache.mailet.Mail;
@@ -78,17 +83,19 @@ import org.springframework.jms.connectio
*
*
*/
-public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport{
+public class ActiveMQMailQueue extends JMSMailQueue implements
ActiveMQSupport, Disposable{
private boolean useBlob;
-
+
+
/**
* Construct a {...@link ActiveMQMailQueue} which only use {...@link
BlobMessage}
+ * @throws NotCompliantMBeanException
*
* @see #ActiveMQMailQueue(ConnectionFactory, String, boolean, Log)
*/
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) {
- this(connectionFactory, queuename, true, logger);
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) throws NotCompliantMBeanException {
+ this(connectionFactory, queuename, true, true, logger);
}
/**
@@ -101,11 +108,13 @@ public class ActiveMQMailQueue extends J
* @param queuename
* @param useBlob
* @param logger
+ * @throws NotCompliantMBeanException
*/
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, boolean useBlob, final Log logger) {
- super(connectionFactory, queuename, logger);
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, boolean useBlob, final boolean useJMX, final Log logger)
throws NotCompliantMBeanException {
+ super(connectionFactory, queuename, useJMX, logger);
this.useBlob = useBlob;
}
+
/*
* (non-Javadoc)
@@ -315,4 +324,76 @@ public class ActiveMQMailQueue extends J
return new ActiveMQMailQueueItem(mail, connection, session, consumer,
message, logger);
}
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.jms.JMSMailQueue#removeWithSelector(java.lang.String)
+ */
+ protected long removeWithSelector(String selector) {
+ Connection connection = null;
+ Session session = null;
+ Message message = null;
+ MessageConsumer consumer = null;
+ boolean first = true;
+ long count = 0;
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queuename);
+ consumer = session.createConsumer(queue, selector);
+ List<Message> messages = new ArrayList<Message>();
+ while (first || message != null) {
+ first = false;
+ message = consumer.receiveNoWait();
+ if (message != null) {
+ messages.add(message);
+ count++;
+ }
+ }
+ session.commit();
+ for (int i = 0; i < messages.size(); i++) {
+ Message m = messages.get(i);
+ if (m instanceof ActiveMQBlobMessage) {
+ try {
+ ((ActiveMQBlobMessage) m).deleteFile();
+ } catch (IOException e) {
+ logger.error("Unable to delete blob file for message "
+m, e);
+ }
+ }
+ }
+ messages.clear();
+ } catch (Exception e) {
+ count = -1;
+ try {
+ session.rollback();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ } finally {
+ if (consumer != null) {
+
+ try {
+ consumer.close();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ }
+
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+ }
+ return count;
+ }
}
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java?rev=1034112&r1=1034111&r2=1034112&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
Thu Nov 11 20:48:03 2010
@@ -19,6 +19,8 @@
package org.apache.james.queue.activemq;
+import javax.management.NotCompliantMBeanException;
+
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.jms.JMSMailQueueFactory;
@@ -38,10 +40,14 @@ public class ActiveMQMailQueueFactory ex
public void setUseBlobMessages(boolean useBlob){
this.useBlob = useBlob;
}
+
@Override
- protected MailQueue createMailQueue(String name) {
- return new ActiveMQMailQueue(connectionFactory, name, useBlob, log);
+ protected MailQueue createMailQueue(String name, boolean useJMX) {
+ try {
+ return new ActiveMQMailQueue(connectionFactory, name, useBlob,
useJMX, log);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Unable to register MBean ", e);
+ }
}
-
}
Added:
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java?rev=1034112&view=auto
==============================================================================
---
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
(added)
+++
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
Thu Nov 11 20:48:03 2010
@@ -0,0 +1,36 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.api;
+
+public interface MailQueueManagementMBean {
+
+ public long getSize();
+
+ public long flush();
+
+ public long clear();
+
+ public boolean removeWithName(String name);
+
+ public long removeWithSender(String address);
+
+ public long removeWithRecipient(String address);
+
+
+}
Modified:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1034112&r1=1034111&r2=1034112&view=diff
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
(original)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
Thu Nov 11 20:48:03 2010
@@ -21,8 +21,10 @@ package org.apache.james.queue.jms;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Date;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -39,16 +41,23 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.AddressException;
import javax.mail.internet.MimeMessage;
+import javax.management.MBeanServer;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
import org.apache.commons.logging.Log;
import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
+import org.apache.james.lifecycle.Disposable;
import org.apache.james.queue.api.MailPrioritySupport;
import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueManagementMBean;
import org.apache.mailet.Mail;
import org.apache.mailet.MailAddress;
@@ -61,19 +70,31 @@ import org.apache.mailet.MailAddress;
*
*
*/
-public class JMSMailQueue implements MailQueue, JMSSupport,
MailPrioritySupport {
+public class JMSMailQueue extends StandardMBean implements MailQueue,
JMSSupport, MailPrioritySupport, Disposable, MailQueueManagementMBean {
protected final String queuename;
protected final ConnectionFactory connectionFactory;
protected final Log logger;
+ private MBeanServer mbeanServer;
+ private String mbeanName;
+ private boolean useJMX;
+ public final static String FORCE_DELIVERY = "FORCE_DELIVERY";
- public JMSMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) {
+ public JMSMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final boolean useJMX, final Log logger) throws
NotCompliantMBeanException {
+ this(connectionFactory, queuename, useJMX, logger,
MailQueueManagementMBean.class);
+
+ }
+
+ protected JMSMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final boolean useJMX, final Log logger, Class<?> c) throws
NotCompliantMBeanException {
+ super(c);
this.connectionFactory = connectionFactory;
this.queuename = queuename;
this.logger = logger;
- }
+ this.useJMX = useJMX;
+ registerMBean();
+ }
/**
* Execute the given {...@link DequeueOperation} when a mail is ready to
process. As JMS does not support delay scheduling out-of-the box, we use
* a messageselector to check if a mail is ready. For this a {...@link
MessageConsumer#receive(long) is used with a timeout of 10 seconds.
@@ -160,15 +181,6 @@ public class JMSMailQueue implements Mai
}
- /**
- * Return message selector to use for consuming
- *
- * @return selector
- */
- protected String getMessageSelector() {
- return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis();
- }
-
/*
* (non-Javadoc)
*
@@ -472,5 +484,263 @@ public class JMSMailQueue implements Mai
final Mail mail = createMail(message);
return new JMSMailQueueItem(mail, connection, session, consumer);
}
+
+ protected String getMessageSelector() {
+ return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis() + "
OR " +FORCE_DELIVERY + " ='true'";
+ }
+
+
+ private void registerMBean() {
+ if (useJMX) {
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ mbeanName = "org.apache.james:type=component,name=queue,queue=" +
queuename;
+ try {
+ mbeanServer.registerMBean(this, new ObjectName(mbeanName));
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to register mbean" , e);
+ }
+ }
+ }
+
+ private void unregisterMBean(){
+ if (useJMX) {
+ try {
+ mbeanServer.unregisterMBean(new ObjectName(mbeanName));
+
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to unregister mbean" , e);
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.api.MailQueueManagementMBean#getSize()
+ */
+ @SuppressWarnings("rawtypes")
+ public long getSize() {
+ Connection connection = null;
+ Session session = null;
+ QueueBrowser browser = null;
+ int size = 0;
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queuename);
+
+ browser = session.createBrowser(queue);
+
+ Enumeration messages = browser.getEnumeration();
+
+ while(messages.hasMoreElements()) {
+ messages.nextElement();
+ size++;
+ }
+ } catch (Exception e) {
+ size = -1;
+ } finally {
+ try {
+ if (browser != null)
+ browser.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+ }
+ return size;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.api.MailQueueManagementMBean#flush()
+ */
+ public long flush() {
+ Connection connection = null;
+ Session session = null;
+ Message message = null;
+ MessageConsumer consumer = null;
+ MessageProducer producer = null;
+ boolean first = true;
+ long count = 0;
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queuename);
+ consumer = session.createConsumer(queue);
+ producer = session.createProducer(queue);
+
+ while (first || message != null) {
+ first = false;
+ message = consumer.receiveNoWait();
+ if (message != null) {
+ message.setBooleanProperty(FORCE_DELIVERY, true);
+ producer.send(message);
+ count++;
+ }
+ }
+ session.commit();
+ } catch (Exception e) {
+ count = -1;
+ try {
+ session.rollback();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ } finally {
+ if (consumer != null) {
+
+ try {
+ consumer.close();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ }
+ if (producer != null) {
+
+ try {
+ producer.close();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ }
+
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+ }
+ return count;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.api.MailQueueManagementMBean#clear()
+ */
+ public long clear() {
+ return removeWithSelector(null);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.lifecycle.Disposable#dispose()
+ */
+ public void dispose() {
+ unregisterMBean();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithName(java.lang.String)
+ */
+ public boolean removeWithName(String name) {
+ if (removeWithSelector(JAMES_MAIL_NAME + " = '" + name +"'") > 0) {
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithSender(java.lang.String)
+ */
+ public long removeWithSender(String address) {
+ return removeWithSelector(JAMES_MAIL_SENDER + " = '" + address +"'");
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithRecipient(java.lang.String)
+ */
+ public long removeWithRecipient(String address) {
+ return removeWithSelector(JAMES_MAIL_RECIPIENTS+ " = '" + address +"'
or " + JAMES_MAIL_RECIPIENTS+ " = '%," + address + "' or " +
JAMES_MAIL_RECIPIENTS+ " = '%," + address +"%'");
+ }
+
+ /**
+ * Remove a message with the fiven selector
+ *
+ * @param selector
+ * @return count
+ */
+ protected long removeWithSelector(String selector) {
+ Connection connection = null;
+ Session session = null;
+ Message message = null;
+ MessageConsumer consumer = null;
+ boolean first = true;
+ long count = 0;
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queuename);
+ consumer = session.createConsumer(queue, selector);
+ while (first || message != null) {
+ first = false;
+ message = consumer.receiveNoWait();
+ if (message != null) {
+ count++;
+ }
+ }
+ session.commit();
+
+ } catch (Exception e) {
+ count = -1;
+ try {
+ session.rollback();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ } finally {
+ if (consumer != null) {
+
+ try {
+ consumer.close();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ }
+
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+ }
+ return count;
+ }
}
Modified:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java?rev=1034112&r1=1034111&r2=1034112&view=diff
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
(original)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
Thu Nov 11 20:48:03 2010
@@ -19,12 +19,16 @@
package org.apache.james.queue.jms;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.jms.ConnectionFactory;
+import javax.management.NotCompliantMBeanException;
import org.apache.commons.logging.Log;
+import org.apache.james.lifecycle.LifecycleUtil;
import org.apache.james.lifecycle.LogEnabled;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
@@ -38,9 +42,22 @@ public class JMSMailQueueFactory impleme
- private final Map<String, MailQueue> queues = new HashMap<String,
MailQueue>();
+ protected final Map<String, MailQueue> queues = new HashMap<String,
MailQueue>();
protected ConnectionFactory connectionFactory;
protected Log log;
+ private boolean useJMX = true;
+
+ public void setUseJMX(boolean useJMX) {
+ this.useJMX = useJMX;
+ }
+
+ @PreDestroy
+ public void destroy() {
+ Iterator<MailQueue> it = queues.values().iterator();
+ while(it.hasNext()) {
+ LifecycleUtil.dispose(it.next());
+ }
+ }
@Resource(name="jmsConnectionFactory")
public void setConnectionFactory(ConnectionFactory connectionFactory) {
@@ -55,7 +72,7 @@ public class JMSMailQueueFactory impleme
public synchronized final MailQueue getQueue(String name) {
MailQueue queue = queues.get(name);
if (queue == null) {
- queue = createMailQueue(name);
+ queue = createMailQueue(name, useJMX);
queues.put(name, queue);
}
@@ -69,8 +86,12 @@ public class JMSMailQueueFactory impleme
* @param name
* @return queue
*/
- protected MailQueue createMailQueue(String name) {
- return new JMSMailQueue(connectionFactory, name, log);
+ protected MailQueue createMailQueue(String name, boolean useJMX) {
+ try {
+ return new JMSMailQueue(connectionFactory, name, useJMX, log);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Unable to register MBean ", e);
+ }
}
/*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]