Author: norman
Date: Fri Nov 12 17:49:06 2010
New Revision: 1034492

URL: http://svn.apache.org/viewvc?rev=1034492&view=rev
Log:
Finish the JMX work for JMS/ActiveMQ (JAMES-1057)

Added:
    
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java
Modified:
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
    
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 Fri Nov 12 17:49:06 2010
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,18 +38,17 @@ import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 import javax.mail.internet.SharedInputStream;
-import javax.management.NotCompliantMBeanException;
 
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
 import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.commons.logging.Log;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
 import org.apache.james.core.MimeMessageInputStream;
 import org.apache.james.core.MimeMessageInputStreamSource;
 import org.apache.james.core.MimeMessageSource;
 import org.apache.james.core.MimeMessageWrapper;
-import org.apache.james.lifecycle.Disposable;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.jms.JMSMailQueue;
 import org.apache.mailet.Mail;
@@ -83,7 +81,7 @@ import org.springframework.jms.connectio
  * 
  * 
  */
-public class ActiveMQMailQueue extends JMSMailQueue implements 
ActiveMQSupport, Disposable{
+public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport{
     
     private boolean useBlob;
     
@@ -94,8 +92,8 @@ public class ActiveMQMailQueue extends J
      * 
      * @see #ActiveMQMailQueue(ConnectionFactory, String, boolean, Log)
      */
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) throws NotCompliantMBeanException {
-        this(connectionFactory, queuename, true, true, logger);
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) {
+        this(connectionFactory, queuename, true, logger);
     }
     
     /**
@@ -110,68 +108,11 @@ public class ActiveMQMailQueue extends J
      * @param logger
      * @throws NotCompliantMBeanException 
      */
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, boolean useBlob, final boolean useJMX, final Log logger) 
throws NotCompliantMBeanException {
-        super(connectionFactory, queuename, useJMX, logger);
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, boolean useBlob, final Log logger) {
+        super(connectionFactory, queuename, logger);
         this.useBlob = useBlob;
     }
 
-    
-    /*
-     * (non-Javadoc)
-     * @see org.apache.james.queue.jms.JMSMailQueue#deQueue()
-     */
-    /*
-    public MailQueueItem deQueue() throws MailQueueException {
-        Connection connection = null;
-        Session session = null;
-        Message message = null;
-        MessageConsumer consumer = null;
-
-        try {
-            connection = connectionFactory.createConnection();
-            connection.start();
-
-            session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue(queuename);
-            consumer = session.createConsumer(queue);
-
-            message = consumer.receive();
-            return createMailQueueItem(connection, session, consumer, message);
-
-        } catch (Exception e) {
-            try {
-                session.rollback();
-            } catch (JMSException e1) {
-                // ignore on rollback
-            }
-
-            if (consumer != null) {
-
-                try {
-                    consumer.close();
-                } catch (JMSException e1) {
-                    // ignore on rollback
-                }
-            }
-            try {
-                if (session != null)
-                    session.close();
-            } catch (JMSException e1) {
-                // ignore here
-            }
-
-            try {
-                if (connection != null)
-                    connection.close();
-            } catch (JMSException e1) {
-                // ignore here
-            }
-            throw new MailQueueException("Unable to dequeue next message", e);
-        }
-
-    }
-    */
-
     /*
      * (non-Javadoc)
      * 
@@ -304,96 +245,47 @@ public class ActiveMQMailQueue extends J
     }
     
 
-    /*
-    @Override
-    protected Map<String, Object> getJMSProperties(Mail mail, long 
delayInMillis) throws JMSException, MessagingException {
-        Map<String, Object> props =  super.getJMSProperties(mail, 
delayInMillis);
-       
-        // add JMS Property for handling message scheduling
-        // http://activemq.apache.org/delay-and-schedule-message-delivery.html
-        if (delayInMillis > 0) {
-            props.put(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis);
-        }
-        return props;
-    }
-    */
-
     @Override
     protected MailQueueItem createMailQueueItem(Connection connection, Session 
session, MessageConsumer consumer, Message message) throws JMSException, 
MessagingException {
         Mail mail = createMail(message);
         return new ActiveMQMailQueueItem(mail, connection, session, consumer, 
message, logger);
     }
 
-    /*
-     * (non-Javadoc)
-     * @see 
org.apache.james.queue.jms.JMSMailQueue#removeWithSelector(java.lang.String)
-     */
-    protected long removeWithSelector(String selector) {
-        Connection connection = null;
-        Session session = null;
-        Message message = null;
-        MessageConsumer consumer = null;
-        boolean first = true;
-        long count = 0;
-        try {
-            connection = connectionFactory.createConnection();
-            connection.start();
-
-            session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue(queuename);
-            consumer = session.createConsumer(queue, selector);
-            List<Message> messages = new ArrayList<Message>();
-            while (first || message != null) {
-                first = false;
-                message = consumer.receiveNoWait();
-                if (message != null) {
-                    messages.add(message);
-                    count++;
-                }
-            }
-            session.commit();
-            for (int i = 0; i < messages.size(); i++) {
-                Message m = messages.get(i);
-                if (m instanceof ActiveMQBlobMessage) {
-                    try {
-                        ((ActiveMQBlobMessage) m).deleteFile();
-                    } catch (IOException e) {
-                        logger.error("Unable to delete blob file for message " 
+m, e);
-                    }
-                }
-            }
-            messages.clear();
-        } catch (Exception e) {
-            count = -1;
-            try {
-                session.rollback();
-            } catch (JMSException e1) {
-                // ignore on rollback
-            }
-        } finally {
-            if (consumer != null) {
-
+    
+    @Override
+    protected List<Message> removeWithSelector(String selector) throws 
MailQueueException{
+        List<Message> mList = super.removeWithSelector(selector);
+        
+        // Handle the blob messages
+        for (int i = 0; i < mList.size(); i++) {
+            Message m = mList.get(i);
+            if (m instanceof ActiveMQBlobMessage) {
                 try {
-                    consumer.close();
-                } catch (JMSException e1) {
-                    // ignore on rollback
+                    // Should get remove once this issue is closed:
+                    // https://issues.apache.org/activemq/browse/AMQ-3018
+                    ((ActiveMQBlobMessage) m).deleteFile();
+                } catch (Exception e) {
+                    logger.error("Unable to delete blob file for message " +m, 
e);
                 }
             }
-            
-            try {
-                if (session != null)
-                    session.close();
-            } catch (JMSException e1) {
-                // ignore here
-            }
+        }
+        return mList;
+    }
 
+    
+    @Override
+    protected Message copy(Session session, Message m) throws JMSException {
+        if (m instanceof ActiveMQBlobMessage) {
+            ActiveMQBlobMessage b = (ActiveMQBlobMessage)m;
+            ActiveMQBlobMessage copy = (ActiveMQBlobMessage) 
getAMQSession(session).createBlobMessage(b.getURL());
             try {
-                if (connection != null)
-                    connection.close();
-            } catch (JMSException e1) {
-                // ignore here
+                copy.setProperties(b.getProperties());
+            } catch (IOException e) {
+                throw JMSExceptionSupport.create("Unable to copy message " + 
m, e);
             }
-        }    
-        return count;
+            return copy;
+        } else {
+            return super.copy(session, m);
+        }
     }
 }

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
 Fri Nov 12 17:49:06 2010
@@ -19,8 +19,6 @@
 package org.apache.james.queue.activemq;
 
 
-import javax.management.NotCompliantMBeanException;
-
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.jms.JMSMailQueueFactory;
@@ -44,10 +42,6 @@ public class ActiveMQMailQueueFactory ex
     
     @Override
     protected MailQueue createMailQueue(String name, boolean useJMX) {
-        try {
-            return new ActiveMQMailQueue(connectionFactory, name, useBlob, 
useJMX, log);
-        } catch (NotCompliantMBeanException e) {
-            throw new RuntimeException("Unable to register MBean ", e);
-        }
+        return new ActiveMQMailQueue(connectionFactory, name, useBlob, log);  
     }
 }

Modified: 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
--- 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
 (original)
+++ 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueueManagementMBean.java
 Fri Nov 12 17:49:06 2010
@@ -18,19 +18,49 @@
  ****************************************************************/
 package org.apache.james.queue.api;
 
+
 public interface MailQueueManagementMBean {
 
+    /**
+     * Return the size of the queue or -1 if the size could not get calculated
+     * 
+     * @return size the size or -1 if it could not get calculated
+     */
     public long getSize();
     
+    /**
+     * Flush queue to make every Mail ready to consume. 
+     * 
+     * @return count the count of all flushed mails or -1 if the flush was not 
possible
+     */
     public long flush();
     
+    /**
+     * Clear the queue
+     * 
+     * @return count the count of all removed mails or -1 if clear was not 
possible
+     */
     public long clear();
     
-    public boolean removeWithName(String name);
-    
+    /**
+     * Remove mail with name from the queue
+     * 
+     * @return count the count of all removed mails or -1 if clear was not 
possible
+     */
+    public long removeWithName(String name);
+    
+    /**
+     * Remove mail with specific sender from the queue
+     * 
+     * @return count the count of all removed mails or -1 if clear was not 
possible
+     */
     public long removeWithSender(String address);
     
+    /**
+     * Remove mail with specific recipient from the queue
+     * 
+     * @return count the count of all removed mails or -1 if clear was not 
possible
+     */
     public long removeWithRecipient(String address);
-
     
 }

Added: 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java?rev=1034492&view=auto
==============================================================================
--- 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
 (added)
+++ 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
 Fri Nov 12 17:49:06 2010
@@ -0,0 +1,89 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.api;
+
+import java.util.List;
+
+/**
+ * {...@link MailQueue} which is manageable
+ *
+ */
+public interface ManageableMailQueue extends MailQueue{
+
+    public enum Type {
+        Sender,
+        Recipient,
+        Name
+    }
+    
+    /**
+     * Return the size of the queue
+     * 
+     * @return size 
+     * @throws MailQueueException
+     */
+    public long getSize() throws MailQueueException;
+    
+    /**
+     * Flush the queue, which means it will make all message ready for dequeue
+     * 
+     * @return count the count of all flushed mails
+     * @throws MailQueueException
+     */
+    public long flush() throws MailQueueException;
+    
+    /**
+     * Remove all mails from the queue
+     * 
+     * @return count the count of all removed mails
+     * @throws MailQueueException
+     */
+    public long clear() throws MailQueueException;
+    
+    /**
+     * Remove all mails from the queue that match
+     * 
+     * @param type
+     * @param value
+     * @return count the count of all removed mails
+     * @throws MailQueueException
+     */
+    public long remove(Type type, String value) throws MailQueueException;
+    
+    /**
+     * Return a View on the content of the queue
+     * 
+     * @return content
+     */
+    public List<MailQueueItemView> view() throws MailQueueException;
+    
+    
+    /**
+     * A View of a {...@link MailQueueItem}
+     * 
+     *
+     */
+    public interface MailQueueItemView {
+        public String getName();
+        public String getSender();
+        public String[] getRecipients();
+        public long getSize();
+        public long getNextRetry();
+    }
+}

Modified: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 (original)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 Fri Nov 12 17:49:06 2010
@@ -21,7 +21,6 @@ package org.apache.james.queue.jms;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Enumeration;
@@ -46,18 +45,13 @@ import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.AddressException;
 import javax.mail.internet.MimeMessage;
-import javax.management.MBeanServer;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
 
 import org.apache.commons.logging.Log;
 import org.apache.james.core.MailImpl;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
-import org.apache.james.lifecycle.Disposable;
 import org.apache.james.queue.api.MailPrioritySupport;
 import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueueManagementMBean;
+import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailAddress;
 
@@ -70,30 +64,18 @@ import org.apache.mailet.MailAddress;
  * 
  * 
  */
-public class JMSMailQueue extends StandardMBean implements MailQueue, 
JMSSupport, MailPrioritySupport, Disposable, MailQueueManagementMBean {
+public class JMSMailQueue implements ManageableMailQueue, JMSSupport, 
MailPrioritySupport {
 
     protected final String queuename;
     protected final ConnectionFactory connectionFactory;
     protected final Log logger;
-    private MBeanServer mbeanServer;
-    private String mbeanName;
-    private boolean useJMX;
     public final static String FORCE_DELIVERY = "FORCE_DELIVERY";
 
 
-    public JMSMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final boolean useJMX, final Log logger) throws 
NotCompliantMBeanException {
-        this(connectionFactory, queuename, useJMX, logger, 
MailQueueManagementMBean.class);
-
-    }
-
-    protected JMSMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final boolean useJMX, final Log logger, Class<?> c) throws 
NotCompliantMBeanException {
-        super(c);
+    public JMSMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) {
         this.connectionFactory = connectionFactory;
         this.queuename = queuename;
         this.logger = logger;
-        this.useJMX = useJMX;
-        registerMBean();
-
     }
 /**
      * Execute the given {...@link DequeueOperation} when a mail is ready to 
process. As JMS does not support delay scheduling out-of-the box, we use 
@@ -486,39 +468,18 @@ public class JMSMailQueue extends Standa
     }
     
     protected String getMessageSelector() {
-        return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis() + " 
OR " +FORCE_DELIVERY + " ='true'";
+        return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis() + " 
OR " + FORCE_DELIVERY + " = true";
     }
 
     
-    private void registerMBean() {
-        if (useJMX) {
-            mbeanServer = ManagementFactory.getPlatformMBeanServer(); 
-            mbeanName = "org.apache.james:type=component,name=queue,queue=" + 
queuename;
-            try {
-                mbeanServer.registerMBean(this, new ObjectName(mbeanName));
-            } catch (Exception e) {
-                throw new RuntimeException("Unable to register mbean" , e);
-            }
-        }
-    }
-    
-    private void unregisterMBean(){
-        if (useJMX) {
-            try {
-                mbeanServer.unregisterMBean(new ObjectName(mbeanName));
-           
-            } catch (Exception e) {
-                throw new RuntimeException("Unable to unregister mbean" , e);
-            }
-        }
-    }
-    
+   
+
     /*
      * (non-Javadoc)
-     * @see org.apache.james.queue.api.MailQueueManagementMBean#getSize()
+     * @see org.apache.james.queue.api.ManageableMailQueue#getSize()
      */
-    @SuppressWarnings("rawtypes")
-    public long getSize() {
+    @SuppressWarnings("unchecked")
+    public long getSize() throws MailQueueException {
         Connection connection = null;
         Session session = null;
         QueueBrowser browser = null;
@@ -537,9 +498,10 @@ public class JMSMailQueue extends Standa
                 messages.nextElement();
                 size++;
             }
+            return size;
         } catch (Exception e) {
             logger.error("Unable to get size of queue " + queuename, e);
-            size = -1;
+            throw new MailQueueException("Unable to get size of queue " + 
queuename, e);
         } finally {
             try {
                 if (browser != null)
@@ -562,14 +524,13 @@ public class JMSMailQueue extends Standa
                 // ignore here
             }
         }
-        return size;
     }
 
     /*
      * (non-Javadoc)
-     * @see org.apache.james.queue.api.MailQueueManagementMBean#flush()
+     * @see org.apache.james.queue.api.ManageableMailQueue#flush()
      */
-    public long flush() {
+    public long flush() throws MailQueueException {
         Connection connection = null;
         Session session = null;
         Message message = null;
@@ -587,23 +548,31 @@ public class JMSMailQueue extends Standa
             producer = session.createProducer(queue);
 
             while (first || message != null) {
+                if (first) {
+                    // give the consumer 2000 ms to receive messages
+                    message = consumer.receive(2000);
+                } else {
+                    message = consumer.receiveNoWait();
+                }
                 first = false;
-                message = consumer.receiveNoWait();
+                
                 if (message != null) {
-                    message.setBooleanProperty(FORCE_DELIVERY, true);
-                    producer.send(message);
+                    Message m = copy(session, message);
+                    m.setBooleanProperty(FORCE_DELIVERY, true);
+                    producer.send(m, message.getJMSDeliveryMode(), 
message.getJMSPriority(), message.getJMSExpiration());
                     count++;
                 }
             }
             session.commit();
+            return count;
         } catch (Exception e) {
             logger.error("Unable to flush mail" , e);
-            count = -1;
             try {
                 session.rollback();
             } catch (JMSException e1) {
                 // ignore on rollback
             }
+            throw new MailQueueException("Unable to get size of queue " + 
queuename, e);
         } finally {
             if (consumer != null) {
 
@@ -636,65 +605,38 @@ public class JMSMailQueue extends Standa
                 // ignore here
             }
         }   
-        return count;
     }
 
     /*
      * (non-Javadoc)
      * @see org.apache.james.queue.api.MailQueueManagementMBean#clear()
      */
-    public long clear() {
-        return removeWithSelector(null);
-    }
-
-    /*
-     * (non-Javadoc)
-     * @see org.apache.james.lifecycle.Disposable#dispose()
-     */
-    public void dispose() {
-        unregisterMBean();
+    public long clear() throws MailQueueException {
+        return count(removeWithSelector(null));
     }
 
-    /*
-     * (non-Javadoc)
-     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithName(java.lang.String)
-     */
-    public boolean removeWithName(String name) {
-        if (removeWithSelector(JAMES_MAIL_NAME + " = '" + name +"'") > 0) {
-            return true;
+    protected long count(List<Message> msgs) {
+        if (msgs == null) {
+            return -1;
+        } else {
+            return msgs.size();
         }
-        return false;
     }
-
-    /*
-     * (non-Javadoc)
-     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithSender(java.lang.String)
-     */
-    public long removeWithSender(String address) {
-        return removeWithSelector(JAMES_MAIL_SENDER + " = '" + address +"'");
-    }
-
-    /*
-     * (non-Javadoc)
-     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithRecipient(java.lang.String)
-     */
-    public long removeWithRecipient(String address) {
-        return removeWithSelector(JAMES_MAIL_RECIPIENTS+ " = '" + address +"' 
or " + JAMES_MAIL_RECIPIENTS+ " = '%," + address + "' or " + 
JAMES_MAIL_RECIPIENTS+ " = '%," + address +"%'");
-    }
-
+    
     /**
      * Remove a message with the fiven selector
      * 
      * @param selector
      * @return count
      */
-    protected long removeWithSelector(String selector) {
+    protected List<Message> removeWithSelector(String selector) throws 
MailQueueException{
         Connection connection = null;
         Session session = null;
         Message message = null;
         MessageConsumer consumer = null;
         boolean first = true;
-        long count = 0;
+        List<Message> messages = new ArrayList<Message>();
+        
         try {
             connection = connectionFactory.createConnection();
             connection.start();
@@ -703,23 +645,27 @@ public class JMSMailQueue extends Standa
             Queue queue = session.createQueue(queuename);
             consumer = session.createConsumer(queue, selector);
             while (first || message != null) {
+                if (first) {
+                    // give the consumer 2000 ms to receive messages
+                    message = consumer.receive(2000);
+                } else {
+                    message = consumer.receiveNoWait();
+                }
                 first = false;
-                message = consumer.receiveNoWait();
                 if (message != null) {
-                    count++;
+                    messages.add(message);
                 }
             }
             session.commit();
-            
+            return messages;
         } catch (Exception e) {
-            logger.error("Unable to remove mails" , e);
-
-            count = -1;
             try {
                 session.rollback();
             } catch (JMSException e1) {
                 // ignore on rollback
             }
+            throw new MailQueueException("Unable to remove mails" , e);
+
         } finally {
             if (consumer != null) {
 
@@ -744,7 +690,161 @@ public class JMSMailQueue extends Standa
                 // ignore here
             }
         }    
-        return count;
+    }
+    
+    /**
+     * Create a copy of the given {...@link Message}. This includes the 
properties and the payload
+     * 
+     * 
+     * @param session
+     * @param m 
+     * @return copy
+     * @throws JMSException
+     */
+    @SuppressWarnings("unchecked")
+    protected Message copy(Session session, Message m) throws JMSException {
+        ObjectMessage message = (ObjectMessage) m;
+        ObjectMessage copy = session.createObjectMessage(message.getObject());
+
+        Enumeration<String> properties = message.getPropertyNames();
+        while (properties.hasMoreElements()) {
+            String name = properties.nextElement();
+            copy.setObjectProperty(name, message.getObjectProperty(name));
+        }
+                
+        return copy;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.queue.api.ManageableMailQueue#remove(org.apache.james.queue.api.ManageableMailQueue.Type,
 java.lang.String)
+     */
+    public long remove(Type type, String value) throws MailQueueException{
+        switch (type) {
+        case Name:
+            return count(removeWithSelector(JAMES_MAIL_NAME + " = '" + value 
+"'"));
+        case Sender:
+            return count(removeWithSelector(JAMES_MAIL_SENDER + " = '" + value 
+"'"));
+        case Recipient:
+            return count(removeWithSelector(JAMES_MAIL_RECIPIENTS+ " = '" + 
value +"' or " + JAMES_MAIL_RECIPIENTS+ " = '%," + value + "' or " + 
JAMES_MAIL_RECIPIENTS+ " = '%," + value +"%'"));
+        default:
+            break;
+        }
+        return -1;
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.api.ManageableMailQueue#view()
+     */
+    @SuppressWarnings("unchecked")
+    public List<MailQueueItemView> view() throws MailQueueException {
+        Connection connection = null;
+        Session session = null;
+        QueueBrowser browser = null;
+        List<MailQueueItemView> view = new ArrayList<MailQueueItemView>();
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(queuename);
+
+            browser = session.createBrowser(queue);
+            
+            Enumeration messages = browser.getEnumeration();
+            
+            while(messages.hasMoreElements()) {
+                Message m = (Message) messages.nextElement();
+                String name = m.getStringProperty(JAMES_MAIL_NAME);
+                long size = m.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
+                String sender = m.getStringProperty(JAMES_MAIL_SENDER);
+                String[] recipients = 
m.getStringProperty(JAMES_MAIL_RECIPIENTS).split(JAMES_MAIL_SEPARATOR);
+                long retry = m.getLongProperty(JAMES_NEXT_DELIVERY);
+                view.add(new SimpleMailQueueItemView(name, sender, recipients, 
size, retry));
+            }
+            return view;
+        } catch (Exception e) {
+            logger.error("Unable to get size of queue " + queuename, e);
+            throw new MailQueueException("Unable to get size of queue " + 
queuename, e);
+        } finally {
+            try {
+                if (browser != null)
+                    browser.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+
+            try {
+                if (session != null)
+                    session.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+
+            try {
+                if (connection != null)
+                    connection.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+        }        
+    }
+    
+    protected class SimpleMailQueueItemView implements MailQueueItemView {
+        private String name;
+        private String sender;
+        private long size;
+        private long retry;
+        private String[] recipients;
+
+        public SimpleMailQueueItemView(String name, String sender, String[] 
recipients, long size, long retry) {
+            this.name = name;
+            this.sender = sender;
+            this.recipients = recipients;
+            this.size = size;
+            this.retry = retry;
+        }
+        
+        /*
+         * (non-Javadoc)
+         * @see 
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getName()
+         */
+        public String getName() {
+            return name;
+        }
+
+        /*
+         * (non-Javadoc)
+         * @see 
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getRecipients()
+         */
+        public String[] getRecipients() {
+            return recipients;
+        }
+
+        /*
+         * (non-Javadoc)
+         * @see 
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getSender()
+         */
+        public String getSender() {
+            return sender;
+        }
+
+        /*
+         * (non-Javadoc)
+         * @see 
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getSize()
+         */
+        public long getSize() {
+            return size;
+        }
+
+        /*
+         * (non-Javadoc)
+         * @see 
org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView#getNextRetry()
+         */
+        public long getNextRetry() {
+            return retry;
+        }
+        
     }
 
 }

Modified: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java?rev=1034492&r1=1034491&r2=1034492&view=diff
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
 (original)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
 Fri Nov 12 17:49:06 2010
@@ -18,20 +18,27 @@
  ****************************************************************/
 package org.apache.james.queue.jms;
 
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 import javax.jms.ConnectionFactory;
-import javax.management.NotCompliantMBeanException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
 import org.apache.commons.logging.Log;
 import org.apache.james.lifecycle.LifecycleUtil;
 import org.apache.james.lifecycle.LogEnabled;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueManagementMBean;
+import org.apache.james.queue.api.ManageableMailQueue;
 
 /**
  * {...@link MailQueueFactory} implementation which use JMS
@@ -46,17 +53,29 @@ public class JMSMailQueueFactory impleme
     protected ConnectionFactory connectionFactory;
     protected Log log;
     private boolean useJMX = true;
+    private MBeanServer mbeanServer;
+    private List<String> mbeans = new ArrayList<String>();
     
     public void setUseJMX(boolean useJMX) {
         this.useJMX = useJMX;
     }
     
+    @PostConstruct
+    public void init() {
+        mbeanServer = ManagementFactory.getPlatformMBeanServer(); 
+    }
+    
     @PreDestroy
     public void destroy() {
+        for (int i = 0; i < mbeans.size(); i++) {
+            unregisterMBean(mbeans.get(i));
+        }
+        
         Iterator<MailQueue> it = queues.values().iterator();
         while(it.hasNext()) {
             LifecycleUtil.dispose(it.next());
         }
+
     }
     
     @Resource(name="jmsConnectionFactory")
@@ -73,6 +92,10 @@ public class JMSMailQueueFactory impleme
         MailQueue queue = queues.get(name);
         if (queue == null) {
             queue = createMailQueue(name, useJMX);
+            if (useJMX) {
+                registerMBean(name, queue);
+             
+            }
             queues.put(name, queue);
         }
 
@@ -87,13 +110,39 @@ public class JMSMailQueueFactory impleme
      * @return queue
      */
     protected MailQueue createMailQueue(String name, boolean useJMX) {
-       try {
-            return new JMSMailQueue(connectionFactory, name, useJMX, log);
-        } catch (NotCompliantMBeanException e) {
-            throw new RuntimeException("Unable to register MBean ", e);
-        }
+        return new JMSMailQueue(connectionFactory, name, log);
     }
 
+    protected synchronized void registerMBean(String queuename, MailQueue 
queue) {
+        
+        String mbeanName = "org.apache.james:type=component,name=queue,queue=" 
+ queuename;
+        try {
+            MailQueueManagementMBean mbean = null;
+            if (queue instanceof ManageableMailQueue) {
+                mbean = new JMSMailQueueManagement((ManageableMailQueue)queue);
+            } else if (queue instanceof MailQueueManagementMBean) {
+                mbean = (MailQueueManagementMBean) queue;
+            }
+            if (mbean != null) {
+                mbeanServer.registerMBean(mbean, new ObjectName(mbeanName));
+                mbeans.add(mbeanName);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to register mbean" , e);
+        }
+        
+    }
+    
+    protected synchronized void unregisterMBean(String mbeanName){
+        try {
+            mbeanServer.unregisterMBean(new ObjectName(mbeanName));
+            mbeans.remove(mbeanName);
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to unregister mbean" , e);
+        }
+        
+    }
+    
     /*
      * (non-Javadoc)
      * @see 
org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)

Added: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java?rev=1034492&view=auto
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java
 (added)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueManagement.java
 Fri Nov 12 17:49:06 2010
@@ -0,0 +1,115 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.jms;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.apache.james.queue.api.MailQueueManagementMBean;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.api.MailQueue.MailQueueException;
+import org.apache.james.queue.api.ManageableMailQueue.Type;
+
+/**
+ * 
+ * JMX MBean implementation which expose management functions by wrapping a 
{...@link ManageableMailQueue}
+ * 
+ *
+ */
+public class JMSMailQueueManagement extends StandardMBean implements 
MailQueueManagementMBean{
+    private final ManageableMailQueue queue;
+
+    public JMSMailQueueManagement(ManageableMailQueue queue) throws 
NotCompliantMBeanException {
+        super(MailQueueManagementMBean.class);
+        this.queue = queue;
+        
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.api.MailQueueManagementMBean#clear()
+     */
+    public long clear() {
+        try {
+            return queue.clear();
+        } catch (MailQueueException e) {
+            return -1;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.api.MailQueueManagementMBean#flush()
+     */
+    public long flush() {
+        try {
+            return queue.flush();
+        } catch (MailQueueException e) {
+            return -1;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.api.MailQueueManagementMBean#getSize()
+     */
+    public long getSize() {
+        try {
+            return queue.getSize();
+        } catch (MailQueueException e) {
+            return -1;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithName(java.lang.String)
+     */
+    public long removeWithName(String name) {
+        try {
+            return queue.remove(Type.Name, name);
+        } catch (MailQueueException e) {
+            return -1;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithRecipient(java.lang.String)
+     */
+    public long removeWithRecipient(String address) {
+        try {
+            return queue.remove(Type.Recipient, address);
+        } catch (MailQueueException e) {
+            return -1;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.queue.api.MailQueueManagementMBean#removeWithSender(java.lang.String)
+     */
+    public long removeWithSender(String address) {
+        try {
+            return queue.remove(Type.Sender, address);
+        } catch (MailQueueException e) {
+            return -1;
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to