Author: norman
Date: Thu Nov 11 20:48:03 2010
New Revision: 1034112

URL: http://svn.apache.org/viewvc?rev=1034112&view=rev
Log:
Expose JMX management for MailQueue

Added:
    
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
Modified:
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1034112&r1=1034111&r2=1034112&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 Thu Nov 11 20:48:03 2010
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import javax.jms.Connection;
@@ -37,15 +39,18 @@ import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 import javax.mail.internet.SharedInputStream;
+import javax.management.NotCompliantMBeanException;
 
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
+import org.apache.activemq.command.ActiveMQBlobMessage;
 import org.apache.commons.logging.Log;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
 import org.apache.james.core.MimeMessageInputStream;
 import org.apache.james.core.MimeMessageInputStreamSource;
 import org.apache.james.core.MimeMessageSource;
 import org.apache.james.core.MimeMessageWrapper;
+import org.apache.james.lifecycle.Disposable;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.jms.JMSMailQueue;
 import org.apache.mailet.Mail;
@@ -78,17 +83,19 @@ import org.springframework.jms.connectio
  * 
  * 
  */
-public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport{
+public class ActiveMQMailQueue extends JMSMailQueue implements 
ActiveMQSupport, Disposable{
     
     private boolean useBlob;
-
+    
+    
     /**
      * Construct a {...@link ActiveMQMailQueue} which only use {...@link 
BlobMessage}
+     * @throws NotCompliantMBeanException 
      * 
      * @see #ActiveMQMailQueue(ConnectionFactory, String, boolean, Log)
      */
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) {
-        this(connectionFactory, queuename, true, logger);
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) throws NotCompliantMBeanException {
+        this(connectionFactory, queuename, true, true, logger);
     }
     
     /**
@@ -101,11 +108,13 @@ public class ActiveMQMailQueue extends J
      * @param queuename
      * @param useBlob
      * @param logger
+     * @throws NotCompliantMBeanException 
      */
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, boolean useBlob, final Log logger) {
-        super(connectionFactory, queuename, logger);
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, boolean useBlob, final boolean useJMX, final Log logger) 
throws NotCompliantMBeanException {
+        super(connectionFactory, queuename, useJMX, logger);
         this.useBlob = useBlob;
     }
+
     
     /*
      * (non-Javadoc)
@@ -315,4 +324,76 @@ public class ActiveMQMailQueue extends J
         return new ActiveMQMailQueueItem(mail, connection, session, consumer, 
message, logger);
     }
 
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.queue.jms.JMSMailQueue#removeWithSelector(java.lang.String)
+     */
+    protected long removeWithSelector(String selector) {
+        Connection connection = null;
+        Session session = null;
+        Message message = null;
+        MessageConsumer consumer = null;
+        boolean first = true;
+        long count = 0;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+
+            session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(queuename);
+            consumer = session.createConsumer(queue, selector);
+            List<Message> messages = new ArrayList<Message>();
+            while (first || message != null) {
+                first = false;
+                message = consumer.receiveNoWait();
+                if (message != null) {
+                    messages.add(message);
+                    count++;
+                }
+            }
+            session.commit();
+            for (int i = 0; i < messages.size(); i++) {
+                Message m = messages.get(i);
+                if (m instanceof ActiveMQBlobMessage) {
+                    try {
+                        ((ActiveMQBlobMessage) m).deleteFile();
+                    } catch (IOException e) {
+                        logger.error("Unable to delete blob file for message " 
+m, e);
+                    }
+                }
+            }
+            messages.clear();
+        } catch (Exception e) {
+            count = -1;
+            try {
+                session.rollback();
+            } catch (JMSException e1) {
+                // ignore on rollback
+            }
+        } finally {
+            if (consumer != null) {
+
+                try {
+                    consumer.close();
+                } catch (JMSException e1) {
+                    // ignore on rollback
+                }
+            }
+            
+            try {
+                if (session != null)
+                    session.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+
+            try {
+                if (connection != null)
+                    connection.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+        }    
+        return count;
+    }
 }

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java?rev=1034112&r1=1034111&r2=1034112&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
 Thu Nov 11 20:48:03 2010
@@ -19,6 +19,8 @@
 package org.apache.james.queue.activemq;
 
 
+import javax.management.NotCompliantMBeanException;
+
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.jms.JMSMailQueueFactory;
@@ -38,10 +40,14 @@ public class ActiveMQMailQueueFactory ex
     public void setUseBlobMessages(boolean useBlob){
         this.useBlob = useBlob;
     }
+
     
     @Override
-    protected MailQueue createMailQueue(String name) {
-        return new ActiveMQMailQueue(connectionFactory, name, useBlob, log);
+    protected MailQueue createMailQueue(String name, boolean useJMX) {
+        try {
+            return new ActiveMQMailQueue(connectionFactory, name, useBlob, 
useJMX, log);
+        } catch (NotCompliantMBeanException e) {
+            throw new RuntimeException("Unable to register MBean ", e);
+        }
     }
-
 }

Added: 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java?rev=1034112&view=auto
==============================================================================
--- 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
 (added)
+++ 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
 Thu Nov 11 20:48:03 2010
@@ -0,0 +1,36 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.api;
+
+public interface MailQueueManagementMBean {
+
+    public long getSize();
+    
+    public long flush();
+    
+    public long clear();
+    
+    public boolean removeWithName(String name);
+    
+    public long removeWithSender(String address);
+    
+    public long removeWithRecipient(String address);
+
+    
+}

Modified: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1034112&r1=1034111&r2=1034112&view=diff
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 (original)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 Thu Nov 11 20:48:03 2010
@@ -21,8 +21,10 @@ package org.apache.james.queue.jms;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -39,16 +41,23 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Queue;
+import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.AddressException;
 import javax.mail.internet.MimeMessage;
+import javax.management.MBeanServer;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
 
 import org.apache.commons.logging.Log;
 import org.apache.james.core.MailImpl;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
+import org.apache.james.lifecycle.Disposable;
 import org.apache.james.queue.api.MailPrioritySupport;
 import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueManagementMBean;
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailAddress;
 
@@ -61,19 +70,31 @@ import org.apache.mailet.MailAddress;
  * 
  * 
  */
-public class JMSMailQueue implements MailQueue, JMSSupport, 
MailPrioritySupport {
+public class JMSMailQueue extends StandardMBean implements MailQueue, 
JMSSupport, MailPrioritySupport, Disposable, MailQueueManagementMBean {
 
     protected final String queuename;
     protected final ConnectionFactory connectionFactory;
     protected final Log logger;
+    private MBeanServer mbeanServer;
+    private String mbeanName;
+    private boolean useJMX;
+    public final static String FORCE_DELIVERY = "FORCE_DELIVERY";
 
 
-    public JMSMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) {
+    public JMSMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final boolean useJMX, final Log logger) throws 
NotCompliantMBeanException {
+        this(connectionFactory, queuename, useJMX, logger, 
MailQueueManagementMBean.class);
+
+    }
+
+    protected JMSMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final boolean useJMX, final Log logger, Class<?> c) throws 
NotCompliantMBeanException {
+        super(c);
         this.connectionFactory = connectionFactory;
         this.queuename = queuename;
         this.logger = logger;
-    }
+        this.useJMX = useJMX;
+        registerMBean();
 
+    }
 /**
      * Execute the given {...@link DequeueOperation} when a mail is ready to 
process. As JMS does not support delay scheduling out-of-the box, we use 
      * a messageselector to check if a mail is ready. For this a {...@link 
MessageConsumer#receive(long) is used with a timeout of 10 seconds. 
@@ -160,15 +181,6 @@ public class JMSMailQueue implements Mai
 
     }
 
-    /**
-     * Return message selector to use for consuming
-     * 
-     * @return selector
-     */
-    protected String getMessageSelector() {
-        return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis();
-    }
-    
     /*
      * (non-Javadoc)
      * 
@@ -472,5 +484,263 @@ public class JMSMailQueue implements Mai
         final Mail mail = createMail(message);
         return new JMSMailQueueItem(mail, connection, session, consumer);
     }
+    
+    protected String getMessageSelector() {
+        return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis() + " 
OR " +FORCE_DELIVERY + " ='true'";
+    }
+
+    
+    private void registerMBean() {
+        if (useJMX) {
+            mbeanServer = ManagementFactory.getPlatformMBeanServer(); 
+            mbeanName = "org.apache.james:type=component,name=queue,queue=" + 
queuename;
+            try {
+                mbeanServer.registerMBean(this, new ObjectName(mbeanName));
+            } catch (Exception e) {
+                throw new RuntimeException("Unable to register mbean" , e);
+            }
+        }
+    }
+    
+    private void unregisterMBean(){
+        if (useJMX) {
+            try {
+                mbeanServer.unregisterMBean(new ObjectName(mbeanName));
+           
+            } catch (Exception e) {
+                throw new RuntimeException("Unable to unregister mbean" , e);
+            }
+        }
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.api.MailQueueManagementMBean#getSize()
+     */
+    @SuppressWarnings("rawtypes")
+    public long getSize() {
+        Connection connection = null;
+        Session session = null;
+        QueueBrowser browser = null;
+        int size = 0;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(queuename);
+
+            browser = session.createBrowser(queue);
+            
+            Enumeration messages = browser.getEnumeration();
+            
+            while(messages.hasMoreElements()) {
+                messages.nextElement();
+                size++;
+            }
+        } catch (Exception e) {
+            size = -1;
+        } finally {
+            try {
+                if (browser != null)
+                    browser.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+
+            try {
+                if (session != null)
+                    session.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+
+            try {
+                if (connection != null)
+                    connection.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+        }
+        return size;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.api.MailQueueManagementMBean#flush()
+     */
+    public long flush() {
+        Connection connection = null;
+        Session session = null;
+        Message message = null;
+        MessageConsumer consumer = null;
+        MessageProducer producer = null;
+        boolean first = true;
+        long count = 0;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+
+            session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(queuename);
+            consumer = session.createConsumer(queue);
+            producer = session.createProducer(queue);
+
+            while (first || message != null) {
+                first = false;
+                message = consumer.receiveNoWait();
+                if (message != null) {
+                    message.setBooleanProperty(FORCE_DELIVERY, true);
+                    producer.send(message);
+                    count++;
+                }
+            }
+            session.commit();
+        } catch (Exception e) {
+            count = -1;
+            try {
+                session.rollback();
+            } catch (JMSException e1) {
+                // ignore on rollback
+            }
+        } finally {
+            if (consumer != null) {
+
+                try {
+                    consumer.close();
+                } catch (JMSException e1) {
+                    // ignore on rollback
+                }
+            }
+            if (producer != null) {
+
+                try {
+                    producer.close();
+                } catch (JMSException e1) {
+                    // ignore on rollback
+                }
+            }
+            
+            try {
+                if (session != null)
+                    session.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+
+            try {
+                if (connection != null)
+                    connection.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+        }   
+        return count;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.api.MailQueueManagementMBean#clear()
+     */
+    public long clear() {
+        return removeWithSelector(null);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.lifecycle.Disposable#dispose()
+     */
+    public void dispose() {
+        unregisterMBean();
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithName(java.lang.String)
+     */
+    public boolean removeWithName(String name) {
+        if (removeWithSelector(JAMES_MAIL_NAME + " = '" + name +"'") > 0) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithSender(java.lang.String)
+     */
+    public long removeWithSender(String address) {
+        return removeWithSelector(JAMES_MAIL_SENDER + " = '" + address +"'");
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithRecipient(java.lang.String)
+     */
+    public long removeWithRecipient(String address) {
+        return removeWithSelector(JAMES_MAIL_RECIPIENTS+ " = '" + address +"' 
or " + JAMES_MAIL_RECIPIENTS+ " = '%," + address + "' or " + 
JAMES_MAIL_RECIPIENTS+ " = '%," + address +"%'");
+    }
+
+    /**
+     * Remove a message with the fiven selector
+     * 
+     * @param selector
+     * @return count
+     */
+    protected long removeWithSelector(String selector) {
+        Connection connection = null;
+        Session session = null;
+        Message message = null;
+        MessageConsumer consumer = null;
+        boolean first = true;
+        long count = 0;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+
+            session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(queuename);
+            consumer = session.createConsumer(queue, selector);
+            while (first || message != null) {
+                first = false;
+                message = consumer.receiveNoWait();
+                if (message != null) {
+                    count++;
+                }
+            }
+            session.commit();
+            
+        } catch (Exception e) {
+            count = -1;
+            try {
+                session.rollback();
+            } catch (JMSException e1) {
+                // ignore on rollback
+            }
+        } finally {
+            if (consumer != null) {
+
+                try {
+                    consumer.close();
+                } catch (JMSException e1) {
+                    // ignore on rollback
+                }
+            }
+            
+            try {
+                if (session != null)
+                    session.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+
+            try {
+                if (connection != null)
+                    connection.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+        }    
+        return count;
+    }
 
 }

Modified: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java?rev=1034112&r1=1034111&r2=1034112&view=diff
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
 (original)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
 Thu Nov 11 20:48:03 2010
@@ -19,12 +19,16 @@
 package org.apache.james.queue.jms;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 
+import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 import javax.jms.ConnectionFactory;
+import javax.management.NotCompliantMBeanException;
 
 import org.apache.commons.logging.Log;
+import org.apache.james.lifecycle.LifecycleUtil;
 import org.apache.james.lifecycle.LogEnabled;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
@@ -38,9 +42,22 @@ public class JMSMailQueueFactory impleme
 
     
     
-    private final Map<String, MailQueue> queues = new HashMap<String, 
MailQueue>();
+    protected final Map<String, MailQueue> queues = new HashMap<String, 
MailQueue>();
     protected ConnectionFactory connectionFactory;
     protected Log log;
+    private boolean useJMX = true;
+    
+    public void setUseJMX(boolean useJMX) {
+        this.useJMX = useJMX;
+    }
+    
+    @PreDestroy
+    public void destroy() {
+        Iterator<MailQueue> it = queues.values().iterator();
+        while(it.hasNext()) {
+            LifecycleUtil.dispose(it.next());
+        }
+    }
     
     @Resource(name="jmsConnectionFactory")
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
@@ -55,7 +72,7 @@ public class JMSMailQueueFactory impleme
     public synchronized final MailQueue getQueue(String name) {
         MailQueue queue = queues.get(name);
         if (queue == null) {
-            queue = createMailQueue(name);
+            queue = createMailQueue(name, useJMX);
             queues.put(name, queue);
         }
 
@@ -69,8 +86,12 @@ public class JMSMailQueueFactory impleme
      * @param name
      * @return queue
      */
-    protected MailQueue createMailQueue(String name) {
-       return new JMSMailQueue(connectionFactory, name, log);
+    protected MailQueue createMailQueue(String name, boolean useJMX) {
+       try {
+            return new JMSMailQueue(connectionFactory, name, useJMX, log);
+        } catch (NotCompliantMBeanException e) {
+            throw new RuntimeException("Unable to register MBean ", e);
+        }
     }
 
     /*



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to