Author: norman
Date: Fri Nov 12 17:49:06 2010
New Revision: 1034492
URL: http://svn.apache.org/viewvc?rev=1034492&view=rev
Log:
Finish the JMX work for JMS/ActiveMQ (JAMES-1057)
Added:
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
Fri Nov 12 17:49:06 2010
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,18 +38,17 @@ import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.SharedInputStream;
-import javax.management.NotCompliantMBeanException;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.core.MimeMessageInputStream;
import org.apache.james.core.MimeMessageInputStreamSource;
import org.apache.james.core.MimeMessageSource;
import org.apache.james.core.MimeMessageWrapper;
-import org.apache.james.lifecycle.Disposable;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.jms.JMSMailQueue;
import org.apache.mailet.Mail;
@@ -83,7 +81,7 @@ import org.springframework.jms.connectio
*
*
*/
-public class ActiveMQMailQueue extends JMSMailQueue implements
ActiveMQSupport, Disposable{
+public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport{
private boolean useBlob;
@@ -94,8 +92,8 @@ public class ActiveMQMailQueue extends J
*
* @see #ActiveMQMailQueue(ConnectionFactory, String, boolean, Log)
*/
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) throws NotCompliantMBeanException {
- this(connectionFactory, queuename, true, true, logger);
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) {
+ this(connectionFactory, queuename, true, logger);
}
/**
@@ -110,68 +108,11 @@ public class ActiveMQMailQueue extends J
* @param logger
* @throws NotCompliantMBeanException
*/
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, boolean useBlob, final boolean useJMX, final Log logger)
throws NotCompliantMBeanException {
- super(connectionFactory, queuename, useJMX, logger);
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, boolean useBlob, final Log logger) {
+ super(connectionFactory, queuename, logger);
this.useBlob = useBlob;
}
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.queue.jms.JMSMailQueue#deQueue()
- */
- /*
- public MailQueueItem deQueue() throws MailQueueException {
- Connection connection = null;
- Session session = null;
- Message message = null;
- MessageConsumer consumer = null;
-
- try {
- connection = connectionFactory.createConnection();
- connection.start();
-
- session = connection.createSession(true,
Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue(queuename);
- consumer = session.createConsumer(queue);
-
- message = consumer.receive();
- return createMailQueueItem(connection, session, consumer, message);
-
- } catch (Exception e) {
- try {
- session.rollback();
- } catch (JMSException e1) {
- // ignore on rollback
- }
-
- if (consumer != null) {
-
- try {
- consumer.close();
- } catch (JMSException e1) {
- // ignore on rollback
- }
- }
- try {
- if (session != null)
- session.close();
- } catch (JMSException e1) {
- // ignore here
- }
-
- try {
- if (connection != null)
- connection.close();
- } catch (JMSException e1) {
- // ignore here
- }
- throw new MailQueueException("Unable to dequeue next message", e);
- }
-
- }
- */
-
/*
* (non-Javadoc)
*
@@ -304,96 +245,47 @@ public class ActiveMQMailQueue extends J
}
- /*
- @Override
- protected Map<String, Object> getJMSProperties(Mail mail, long
delayInMillis) throws JMSException, MessagingException {
- Map<String, Object> props = super.getJMSProperties(mail,
delayInMillis);
-
- // add JMS Property for handling message scheduling
- // http://activemq.apache.org/delay-and-schedule-message-delivery.html
- if (delayInMillis > 0) {
- props.put(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis);
- }
- return props;
- }
- */
-
@Override
protected MailQueueItem createMailQueueItem(Connection connection, Session
session, MessageConsumer consumer, Message message) throws JMSException,
MessagingException {
Mail mail = createMail(message);
return new ActiveMQMailQueueItem(mail, connection, session, consumer,
message, logger);
}
- /*
- * (non-Javadoc)
- * @see
org.apache.james.queue.jms.JMSMailQueue#removeWithSelector(java.lang.String)
- */
- protected long removeWithSelector(String selector) {
- Connection connection = null;
- Session session = null;
- Message message = null;
- MessageConsumer consumer = null;
- boolean first = true;
- long count = 0;
- try {
- connection = connectionFactory.createConnection();
- connection.start();
-
- session = connection.createSession(true,
Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue(queuename);
- consumer = session.createConsumer(queue, selector);
- List<Message> messages = new ArrayList<Message>();
- while (first || message != null) {
- first = false;
- message = consumer.receiveNoWait();
- if (message != null) {
- messages.add(message);
- count++;
- }
- }
- session.commit();
- for (int i = 0; i < messages.size(); i++) {
- Message m = messages.get(i);
- if (m instanceof ActiveMQBlobMessage) {
- try {
- ((ActiveMQBlobMessage) m).deleteFile();
- } catch (IOException e) {
- logger.error("Unable to delete blob file for message "
+m, e);
- }
- }
- }
- messages.clear();
- } catch (Exception e) {
- count = -1;
- try {
- session.rollback();
- } catch (JMSException e1) {
- // ignore on rollback
- }
- } finally {
- if (consumer != null) {
-
+
+ @Override
+ protected List<Message> removeWithSelector(String selector) throws
MailQueueException{
+ List<Message> mList = super.removeWithSelector(selector);
+
+ // Handle the blob messages
+ for (int i = 0; i < mList.size(); i++) {
+ Message m = mList.get(i);
+ if (m instanceof ActiveMQBlobMessage) {
try {
- consumer.close();
- } catch (JMSException e1) {
- // ignore on rollback
+ // Should get remove once this issue is closed:
+ // https://issues.apache.org/activemq/browse/AMQ-3018
+ ((ActiveMQBlobMessage) m).deleteFile();
+ } catch (Exception e) {
+ logger.error("Unable to delete blob file for message " +m,
e);
}
}
-
- try {
- if (session != null)
- session.close();
- } catch (JMSException e1) {
- // ignore here
- }
+ }
+ return mList;
+ }
+
+ @Override
+ protected Message copy(Session session, Message m) throws JMSException {
+ if (m instanceof ActiveMQBlobMessage) {
+ ActiveMQBlobMessage b = (ActiveMQBlobMessage)m;
+ ActiveMQBlobMessage copy = (ActiveMQBlobMessage)
getAMQSession(session).createBlobMessage(b.getURL());
try {
- if (connection != null)
- connection.close();
- } catch (JMSException e1) {
- // ignore here
+ copy.setProperties(b.getProperties());
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create("Unable to copy message " +
m, e);
}
- }
- return count;
+ return copy;
+ } else {
+ return super.copy(session, m);
+ }
}
}
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
Fri Nov 12 17:49:06 2010
@@ -19,8 +19,6 @@
package org.apache.james.queue.activemq;
-import javax.management.NotCompliantMBeanException;
-
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.jms.JMSMailQueueFactory;
@@ -44,10 +42,6 @@ public class ActiveMQMailQueueFactory ex
@Override
protected MailQueue createMailQueue(String name, boolean useJMX) {
- try {
- return new ActiveMQMailQueue(connectionFactory, name, useBlob,
useJMX, log);
- } catch (NotCompliantMBeanException e) {
- throw new RuntimeException("Unable to register MBean ", e);
- }
+ return new ActiveMQMailQueue(connectionFactory, name, useBlob, log);
}
}
Modified:
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
---
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
(original)
+++
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
Fri Nov 12 17:49:06 2010
@@ -18,19 +18,49 @@
****************************************************************/
package org.apache.james.queue.api;
+
public interface MailQueueManagementMBean {
+ /**
+ * Return the size of the queue or -1 if the size could not get calculated
+ *
+ * @return size the size or -1 if it could not get calculated
+ */
public long getSize();
+ /**
+ * Flush queue to make every Mail ready to consume.
+ *
+ * @return count the count of all flushed mails or -1 if the flush was not
possible
+ */
public long flush();
+ /**
+ * Clear the queue
+ *
+ * @return count the count of all removed mails or -1 if clear was not
possible
+ */
public long clear();
- public boolean removeWithName(String name);
-
+ /**
+ * Remove mail with name from the queue
+ *
+ * @return count the count of all removed mails or -1 if clear was not
possible
+ */
+ public long removeWithName(String name);
+
+ /**
+ * Remove mail with specific sender from the queue
+ *
+ * @return count the count of all removed mails or -1 if clear was not
possible
+ */
public long removeWithSender(String address);
+ /**
+ * Remove mail with specific recipient from the queue
+ *
+ * @return count the count of all removed mails or -1 if clear was not
possible
+ */
public long removeWithRecipient(String address);
-
}
Added:
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java?rev=1034492&view=auto
==============================================================================
---
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
(added)
+++
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
Fri Nov 12 17:49:06 2010
@@ -0,0 +1,89 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.api;
+
+import java.util.List;
+
+/**
+ * {...@link MailQueue} which is manageable
+ *
+ */
+public interface ManageableMailQueue extends MailQueue{
+
+ public enum Type {
+ Sender,
+ Recipient,
+ Name
+ }
+
+ /**
+ * Return the size of the queue
+ *
+ * @return size
+ * @throws MailQueueException
+ */
+ public long getSize() throws MailQueueException;
+
+ /**
+ * Flush the queue, which means it will make all message ready for dequeue
+ *
+ * @return count the count of all flushed mails
+ * @throws MailQueueException
+ */
+ public long flush() throws MailQueueException;
+
+ /**
+ * Remove all mails from the queue
+ *
+ * @return count the count of all removed mails
+ * @throws MailQueueException
+ */
+ public long clear() throws MailQueueException;
+
+ /**
+ * Remove all mails from the queue that match
+ *
+ * @param type
+ * @param value
+ * @return count the count of all removed mails
+ * @throws MailQueueException
+ */
+ public long remove(Type type, String value) throws MailQueueException;
+
+ /**
+ * Return a View on the content of the queue
+ *
+ * @return content
+ */
+ public List<MailQueueItemView> view() throws MailQueueException;
+
+
+ /**
+ * A View of a {...@link MailQueueItem}
+ *
+ *
+ */
+ public interface MailQueueItemView {
+ public String getName();
+ public String getSender();
+ public String[] getRecipients();
+ public long getSize();
+ public long getNextRetry();
+ }
+}
Modified:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
(original)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
Fri Nov 12 17:49:06 2010
@@ -21,7 +21,6 @@ package org.apache.james.queue.jms;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
-import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
@@ -46,18 +45,13 @@ import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.AddressException;
import javax.mail.internet.MimeMessage;
-import javax.management.MBeanServer;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
import org.apache.commons.logging.Log;
import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
-import org.apache.james.lifecycle.Disposable;
import org.apache.james.queue.api.MailPrioritySupport;
import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueueManagementMBean;
+import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.mailet.Mail;
import org.apache.mailet.MailAddress;
@@ -70,30 +64,18 @@ import org.apache.mailet.MailAddress;
*
*
*/
-public class JMSMailQueue extends StandardMBean implements MailQueue,
JMSSupport, MailPrioritySupport, Disposable, MailQueueManagementMBean {
+public class JMSMailQueue implements ManageableMailQueue, JMSSupport,
MailPrioritySupport {
protected final String queuename;
protected final ConnectionFactory connectionFactory;
protected final Log logger;
- private MBeanServer mbeanServer;
- private String mbeanName;
- private boolean useJMX;
public final static String FORCE_DELIVERY = "FORCE_DELIVERY";
- public JMSMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final boolean useJMX, final Log logger) throws
NotCompliantMBeanException {
- this(connectionFactory, queuename, useJMX, logger,
MailQueueManagementMBean.class);
-
- }
-
- protected JMSMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final boolean useJMX, final Log logger, Class<?> c) throws
NotCompliantMBeanException {
- super(c);
+ public JMSMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) {
this.connectionFactory = connectionFactory;
this.queuename = queuename;
this.logger = logger;
- this.useJMX = useJMX;
- registerMBean();
-
}
/**
* Execute the given {...@link DequeueOperation} when a mail is ready to
process. As JMS does not support delay scheduling out-of-the box, we use
@@ -486,39 +468,18 @@ public class JMSMailQueue extends Standa
}
protected String getMessageSelector() {
- return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis() + "
OR " +FORCE_DELIVERY + " ='true'";
+ return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis() + "
OR " + FORCE_DELIVERY + " = true";
}
- private void registerMBean() {
- if (useJMX) {
- mbeanServer = ManagementFactory.getPlatformMBeanServer();
- mbeanName = "org.apache.james:type=component,name=queue,queue=" +
queuename;
- try {
- mbeanServer.registerMBean(this, new ObjectName(mbeanName));
- } catch (Exception e) {
- throw new RuntimeException("Unable to register mbean" , e);
- }
- }
- }
-
- private void unregisterMBean(){
- if (useJMX) {
- try {
- mbeanServer.unregisterMBean(new ObjectName(mbeanName));
-
- } catch (Exception e) {
- throw new RuntimeException("Unable to unregister mbean" , e);
- }
- }
- }
-
+
+
/*
* (non-Javadoc)
- * @see org.apache.james.queue.api.MailQueueManagementMBean#getSize()
+ * @see org.apache.james.queue.api.ManageableMailQueue#getSize()
*/
- @SuppressWarnings("rawtypes")
- public long getSize() {
+ @SuppressWarnings("unchecked")
+ public long getSize() throws MailQueueException {
Connection connection = null;
Session session = null;
QueueBrowser browser = null;
@@ -537,9 +498,10 @@ public class JMSMailQueue extends Standa
messages.nextElement();
size++;
}
+ return size;
} catch (Exception e) {
logger.error("Unable to get size of queue " + queuename, e);
- size = -1;
+ throw new MailQueueException("Unable to get size of queue " +
queuename, e);
} finally {
try {
if (browser != null)
@@ -562,14 +524,13 @@ public class JMSMailQueue extends Standa
// ignore here
}
}
- return size;
}
/*
* (non-Javadoc)
- * @see org.apache.james.queue.api.MailQueueManagementMBean#flush()
+ * @see org.apache.james.queue.api.ManageableMailQueue#flush()
*/
- public long flush() {
+ public long flush() throws MailQueueException {
Connection connection = null;
Session session = null;
Message message = null;
@@ -587,23 +548,31 @@ public class JMSMailQueue extends Standa
producer = session.createProducer(queue);
while (first || message != null) {
+ if (first) {
+ // give the consumer 2000 ms to receive messages
+ message = consumer.receive(2000);
+ } else {
+ message = consumer.receiveNoWait();
+ }
first = false;
- message = consumer.receiveNoWait();
+
if (message != null) {
- message.setBooleanProperty(FORCE_DELIVERY, true);
- producer.send(message);
+ Message m = copy(session, message);
+ m.setBooleanProperty(FORCE_DELIVERY, true);
+ producer.send(m, message.getJMSDeliveryMode(),
message.getJMSPriority(), message.getJMSExpiration());
count++;
}
}
session.commit();
+ return count;
} catch (Exception e) {
logger.error("Unable to flush mail" , e);
- count = -1;
try {
session.rollback();
} catch (JMSException e1) {
// ignore on rollback
}
+ throw new MailQueueException("Unable to get size of queue " +
queuename, e);
} finally {
if (consumer != null) {
@@ -636,65 +605,38 @@ public class JMSMailQueue extends Standa
// ignore here
}
}
- return count;
}
/*
* (non-Javadoc)
* @see org.apache.james.queue.api.MailQueueManagementMBean#clear()
*/
- public long clear() {
- return removeWithSelector(null);
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.lifecycle.Disposable#dispose()
- */
- public void dispose() {
- unregisterMBean();
+ public long clear() throws MailQueueException {
+ return count(removeWithSelector(null));
}
- /*
- * (non-Javadoc)
- * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithName(java.lang.String)
- */
- public boolean removeWithName(String name) {
- if (removeWithSelector(JAMES_MAIL_NAME + " = '" + name +"'") > 0) {
- return true;
+ protected long count(List<Message> msgs) {
+ if (msgs == null) {
+ return -1;
+ } else {
+ return msgs.size();
}
- return false;
}
-
- /*
- * (non-Javadoc)
- * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithSender(java.lang.String)
- */
- public long removeWithSender(String address) {
- return removeWithSelector(JAMES_MAIL_SENDER + " = '" + address +"'");
- }
-
- /*
- * (non-Javadoc)
- * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithRecipient(java.lang.String)
- */
- public long removeWithRecipient(String address) {
- return removeWithSelector(JAMES_MAIL_RECIPIENTS+ " = '" + address +"'
or " + JAMES_MAIL_RECIPIENTS+ " = '%," + address + "' or " +
JAMES_MAIL_RECIPIENTS+ " = '%," + address +"%'");
- }
-
+
/**
* Remove a message with the fiven selector
*
* @param selector
* @return count
*/
- protected long removeWithSelector(String selector) {
+ protected List<Message> removeWithSelector(String selector) throws
MailQueueException{
Connection connection = null;
Session session = null;
Message message = null;
MessageConsumer consumer = null;
boolean first = true;
- long count = 0;
+ List<Message> messages = new ArrayList<Message>();
+
try {
connection = connectionFactory.createConnection();
connection.start();
@@ -703,23 +645,27 @@ public class JMSMailQueue extends Standa
Queue queue = session.createQueue(queuename);
consumer = session.createConsumer(queue, selector);
while (first || message != null) {
+ if (first) {
+ // give the consumer 2000 ms to receive messages
+ message = consumer.receive(2000);
+ } else {
+ message = consumer.receiveNoWait();
+ }
first = false;
- message = consumer.receiveNoWait();
if (message != null) {
- count++;
+ messages.add(message);
}
}
session.commit();
-
+ return messages;
} catch (Exception e) {
- logger.error("Unable to remove mails" , e);
-
- count = -1;
try {
session.rollback();
} catch (JMSException e1) {
// ignore on rollback
}
+ throw new MailQueueException("Unable to remove mails" , e);
+
} finally {
if (consumer != null) {
@@ -744,7 +690,161 @@ public class JMSMailQueue extends Standa
// ignore here
}
}
- return count;
+ }
+
+ /**
+ * Create a copy of the given {...@link Message}. This includes the
properties and the payload
+ *
+ *
+ * @param session
+ * @param m
+ * @return copy
+ * @throws JMSException
+ */
+ @SuppressWarnings("unchecked")
+ protected Message copy(Session session, Message m) throws JMSException {
+ ObjectMessage message = (ObjectMessage) m;
+ ObjectMessage copy = session.createObjectMessage(message.getObject());
+
+ Enumeration<String> properties = message.getPropertyNames();
+ while (properties.hasMoreElements()) {
+ String name = properties.nextElement();
+ copy.setObjectProperty(name, message.getObjectProperty(name));
+ }
+
+ return copy;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.ManageableMailQueue#remove(org.apache.james.queue.api.ManageableMailQueue.Type,
java.lang.String)
+ */
+ public long remove(Type type, String value) throws MailQueueException{
+ switch (type) {
+ case Name:
+ return count(removeWithSelector(JAMES_MAIL_NAME + " = '" + value
+"'"));
+ case Sender:
+ return count(removeWithSelector(JAMES_MAIL_SENDER + " = '" + value
+"'"));
+ case Recipient:
+ return count(removeWithSelector(JAMES_MAIL_RECIPIENTS+ " = '" +
value +"' or " + JAMES_MAIL_RECIPIENTS+ " = '%," + value + "' or " +
JAMES_MAIL_RECIPIENTS+ " = '%," + value +"%'"));
+ default:
+ break;
+ }
+ return -1;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.api.ManageableMailQueue#view()
+ */
+ @SuppressWarnings("unchecked")
+ public List<MailQueueItemView> view() throws MailQueueException {
+ Connection connection = null;
+ Session session = null;
+ QueueBrowser browser = null;
+ List<MailQueueItemView> view = new ArrayList<MailQueueItemView>();
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queuename);
+
+ browser = session.createBrowser(queue);
+
+ Enumeration messages = browser.getEnumeration();
+
+ while(messages.hasMoreElements()) {
+ Message m = (Message) messages.nextElement();
+ String name = m.getStringProperty(JAMES_MAIL_NAME);
+ long size = m.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
+ String sender = m.getStringProperty(JAMES_MAIL_SENDER);
+ String[] recipients =
m.getStringProperty(JAMES_MAIL_RECIPIENTS).split(JAMES_MAIL_SEPARATOR);
+ long retry = m.getLongProperty(JAMES_NEXT_DELIVERY);
+ view.add(new SimpleMailQueueItemView(name, sender, recipients,
size, retry));
+ }
+ return view;
+ } catch (Exception e) {
+ logger.error("Unable to get size of queue " + queuename, e);
+ throw new MailQueueException("Unable to get size of queue " +
queuename, e);
+ } finally {
+ try {
+ if (browser != null)
+ browser.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+ }
+ }
+
+ protected class SimpleMailQueueItemView implements MailQueueItemView {
+ private String name;
+ private String sender;
+ private long size;
+ private long retry;
+ private String[] recipients;
+
+ public SimpleMailQueueItemView(String name, String sender, String[]
recipients, long size, long retry) {
+ this.name = name;
+ this.sender = sender;
+ this.recipients = recipients;
+ this.size = size;
+ this.retry = retry;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getName()
+ */
+ public String getName() {
+ return name;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getRecipients()
+ */
+ public String[] getRecipients() {
+ return recipients;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getSender()
+ */
+ public String getSender() {
+ return sender;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getSize()
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getNextRetry()
+ */
+ public long getNextRetry() {
+ return retry;
+ }
+
}
}
Modified:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
(original)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
Fri Nov 12 17:49:06 2010
@@ -18,20 +18,27 @@
****************************************************************/
package org.apache.james.queue.jms;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.jms.ConnectionFactory;
-import javax.management.NotCompliantMBeanException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.james.lifecycle.LifecycleUtil;
import org.apache.james.lifecycle.LogEnabled;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueManagementMBean;
+import org.apache.james.queue.api.ManageableMailQueue;
/**
* {...@link MailQueueFactory} implementation which use JMS
@@ -46,17 +53,29 @@ public class JMSMailQueueFactory impleme
protected ConnectionFactory connectionFactory;
protected Log log;
private boolean useJMX = true;
+ private MBeanServer mbeanServer;
+ private List<String> mbeans = new ArrayList<String>();
public void setUseJMX(boolean useJMX) {
this.useJMX = useJMX;
}
+ @PostConstruct
+ public void init() {
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ }
+
@PreDestroy
public void destroy() {
+ for (int i = 0; i < mbeans.size(); i++) {
+ unregisterMBean(mbeans.get(i));
+ }
+
Iterator<MailQueue> it = queues.values().iterator();
while(it.hasNext()) {
LifecycleUtil.dispose(it.next());
}
+
}
@Resource(name="jmsConnectionFactory")
@@ -73,6 +92,10 @@ public class JMSMailQueueFactory impleme
MailQueue queue = queues.get(name);
if (queue == null) {
queue = createMailQueue(name, useJMX);
+ if (useJMX) {
+ registerMBean(name, queue);
+
+ }
queues.put(name, queue);
}
@@ -87,13 +110,39 @@ public class JMSMailQueueFactory impleme
* @return queue
*/
protected MailQueue createMailQueue(String name, boolean useJMX) {
- try {
- return new JMSMailQueue(connectionFactory, name, useJMX, log);
- } catch (NotCompliantMBeanException e) {
- throw new RuntimeException("Unable to register MBean ", e);
- }
+ return new JMSMailQueue(connectionFactory, name, log);
}
+ protected synchronized void registerMBean(String queuename, MailQueue
queue) {
+
+ String mbeanName = "org.apache.james:type=component,name=queue,queue="
+ queuename;
+ try {
+ MailQueueManagementMBean mbean = null;
+ if (queue instanceof ManageableMailQueue) {
+ mbean = new JMSMailQueueManagement((ManageableMailQueue)queue);
+ } else if (queue instanceof MailQueueManagementMBean) {
+ mbean = (MailQueueManagementMBean) queue;
+ }
+ if (mbean != null) {
+ mbeanServer.registerMBean(mbean, new ObjectName(mbeanName));
+ mbeans.add(mbeanName);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to register mbean" , e);
+ }
+
+ }
+
+ protected synchronized void unregisterMBean(String mbeanName){
+ try {
+ mbeanServer.unregisterMBean(new ObjectName(mbeanName));
+ mbeans.remove(mbeanName);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to unregister mbean" , e);
+ }
+
+ }
+
/*
* (non-Javadoc)
* @see
org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)
Added:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java?rev=1034492&view=auto
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java
(added)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java
Fri Nov 12 17:49:06 2010
@@ -0,0 +1,115 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.jms;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.apache.james.queue.api.MailQueueManagementMBean;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.api.MailQueue.MailQueueException;
+import org.apache.james.queue.api.ManageableMailQueue.Type;
+
+/**
+ *
+ * JMX MBean implementation which expose management functions by wrapping a
{...@link ManageableMailQueue}
+ *
+ *
+ */
+public class JMSMailQueueManagement extends StandardMBean implements
MailQueueManagementMBean{
+ private final ManageableMailQueue queue;
+
+ public JMSMailQueueManagement(ManageableMailQueue queue) throws
NotCompliantMBeanException {
+ super(MailQueueManagementMBean.class);
+ this.queue = queue;
+
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.api.MailQueueManagementMBean#clear()
+ */
+ public long clear() {
+ try {
+ return queue.clear();
+ } catch (MailQueueException e) {
+ return -1;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.api.MailQueueManagementMBean#flush()
+ */
+ public long flush() {
+ try {
+ return queue.flush();
+ } catch (MailQueueException e) {
+ return -1;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.api.MailQueueManagementMBean#getSize()
+ */
+ public long getSize() {
+ try {
+ return queue.getSize();
+ } catch (MailQueueException e) {
+ return -1;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithName(java.lang.String)
+ */
+ public long removeWithName(String name) {
+ try {
+ return queue.remove(Type.Name, name);
+ } catch (MailQueueException e) {
+ return -1;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithRecipient(java.lang.String)
+ */
+ public long removeWithRecipient(String address) {
+ try {
+ return queue.remove(Type.Recipient, address);
+ } catch (MailQueueException e) {
+ return -1;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.queue.api.MailQueueManagementMBean#removeWithSender(java.lang.String)
+ */
+ public long removeWithSender(String address) {
+ try {
+ return queue.remove(Type.Sender, address);
+ } catch (MailQueueException e) {
+ return -1;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]