JAMES-2332 Re-use non-transacted sessions in JMSMailQueue and its sub-classes.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b8229df3
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b8229df3
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b8229df3

Branch: refs/heads/master
Commit: b8229df3b09c4ca59aa23a8f2a3f65c20e4ab1f4
Parents: 16d7104
Author: benwa <btell...@linagora.com>
Authored: Tue Feb 6 10:32:26 2018 +0700
Committer: benwa <btell...@linagora.com>
Committed: Tue Feb 6 10:38:15 2018 +0700

----------------------------------------------------------------------
 .../james/queue/activemq/ActiveMQMailQueue.java | 22 +-----
 .../apache/james/queue/jms/JMSMailQueue.java    | 83 +++++++++-----------
 2 files changed, 39 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b8229df3/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 
b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
index 142b77b..e968196 100644
--- 
a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
+++ 
b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
@@ -141,8 +141,8 @@ public class ActiveMQMailQueue extends JMSMailQueue 
implements ActiveMQSupport {
     /**
      * Produce the mail to the JMS Queue
      */
-    protected void produceMail(Session session, Map<String, Object> props, int 
msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
-        MessageProducer producer = null;
+    @Override
+    protected void produceMail(Map<String, Object> props, int msgPrio, Mail 
mail) throws JMSException, MessagingException, IOException {
         BlobMessage blobMessage = null;
         boolean reuse = false;
 
@@ -189,28 +189,20 @@ public class ActiveMQMailQueue extends JMSMailQueue 
implements ActiveMQSupport {
                 // store the queue name in the props
                 props.put(JAMES_QUEUE_NAME, queueName);
 
-                Queue queue = session.createQueue(queueName);
-
-                producer = session.createProducer(queue);
                 for (Map.Entry<String, Object> entry : props.entrySet()) {
                     blobMessage.setObjectProperty(entry.getKey(), 
entry.getValue());
                 }
                 producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, 
msgPrio, Message.DEFAULT_TIME_TO_LIVE);
-                    
-              
 
             } else {
-                super.produceMail(session, props, msgPrio, mail);
+                super.produceMail(props, msgPrio, mail);
             }
         } catch (JMSException e) {
             if (!reuse && blobMessage != null && blobMessage instanceof 
ActiveMQBlobMessage) {
                 ((ActiveMQBlobMessage) blobMessage).deleteFile();
             }
             throw e;
-        } finally {
-            closeProducer(producer);
         }
-
     }
 
     /**
@@ -272,15 +264,11 @@ public class ActiveMQMailQueue extends JMSMailQueue 
implements ActiveMQSupport {
      */
     @Override
     public long getSize() throws MailQueueException {
-
-        Session session = null;
         MessageConsumer consumer = null;
         MessageProducer producer = null;
         TemporaryQueue replyTo = null;
-        long size;
 
         try {
-            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             replyTo = session.createTemporaryQueue();
             consumer = session.createConsumer(replyTo);
 
@@ -296,8 +284,7 @@ public class ActiveMQMailQueue extends JMSMailQueue 
implements ActiveMQSupport {
             MapMessage reply = (MapMessage) consumer.receive(2000);
             if (reply != null && reply.itemExists("size")) {
                 try {
-                    size = reply.getLong("size");
-                    return size;
+                    return reply.getLong("size");
                 } catch (NumberFormatException e) {
                     return super.getSize();
                 }
@@ -321,7 +308,6 @@ public class ActiveMQMailQueue extends JMSMailQueue 
implements ActiveMQSupport {
                     LOGGER.error("Error while deleting temporary queue", e);
                 }
             }
-            closeSession(session);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/b8229df3/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
index 01f23d5..c6d17d8 100644
--- 
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
+++ 
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
@@ -133,13 +133,18 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JMSMailQueue.class);
 
+    public static final String FORCE_DELIVERY = "FORCE_DELIVERY";
+
     protected final String queueName;
     protected final Connection connection;
     protected final MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory;
     protected final Metric enqueuedMailsMetric;
     protected final Metric mailQueueSize;
     protected final MetricFactory metricFactory;
-    public static final String FORCE_DELIVERY = "FORCE_DELIVERY";
+
+    protected final Session session;
+    protected final Queue queue;
+    protected final MessageProducer producer;
 
     public JMSMailQueue(ConnectionFactory connectionFactory, 
MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queueName, 
MetricFactory metricFactory) {
         try {
@@ -153,6 +158,14 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
         this.metricFactory = metricFactory;
         this.enqueuedMailsMetric = metricFactory.generate("enqueuedMail:" + 
queueName);
         this.mailQueueSize = metricFactory.generate("mailQueueSize:" + 
queueName);
+
+        try {
+            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            queue = session.createQueue(queueName);
+            producer = session.createProducer(queue);
+        } catch (JMSException e) {
+            throw Throwables.propagate(e);
+        }
     }
 
     @Override
@@ -211,14 +224,11 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
     @Override
     public void enQueue(Mail mail, long delay, TimeUnit unit) throws 
MailQueueException {
         TimeMetric timeMetric = metricFactory.timer("enqueueMailTime:" + 
queueName);
-        Session session = null;
 
         long nextDeliveryTimestamp = computeNextDeliveryTimestamp(delay, unit);
 
         try {
 
-            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
             int msgPrio = NORMAL_PRIORITY;
             Object prio = mail.getAttribute(MAIL_PRIORITY);
             if (prio instanceof Integer) {
@@ -227,7 +237,7 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
 
             Map<String, Object> props = getJMSProperties(mail, 
nextDeliveryTimestamp);
 
-            produceMail(session, props, msgPrio, mail);
+            produceMail(props, msgPrio, mail);
 
             enqueuedMailsMetric.increment();
             mailQueueSize.increment();
@@ -235,7 +245,6 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
             throw new MailQueueException("Unable to enqueue mail " + mail, e);
         } finally {
             timeMetric.stopAndPublish();
-            closeSession(session);
         }
     }
 
@@ -257,38 +266,28 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
     /**
      * Produce the mail to the JMS Queue
      */
-    protected void produceMail(Session session, Map<String, Object> props, int 
msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
-        MessageProducer producer = null;
-
-        try {
-            Queue queue = session.createQueue(queueName);
-
-            producer = session.createProducer(queue);
-            ObjectMessage message = session.createObjectMessage();
-
-            for (Map.Entry<String, Object> entry : props.entrySet()) {
-                message.setObjectProperty(entry.getKey(), entry.getValue());
-            }
+    protected void produceMail(Map<String, Object> props, int msgPrio, Mail 
mail) throws JMSException, MessagingException, IOException {
+        ObjectMessage message = session.createObjectMessage();
 
-            long size = mail.getMessageSize();
-            ByteArrayOutputStream out;
-            if (size > -1) {
-                out = new ByteArrayOutputStream((int) size);
-            } else {
-                out = new ByteArrayOutputStream();
-            }
-            mail.getMessage().writeTo(out);
+        for (Map.Entry<String, Object> entry : props.entrySet()) {
+            message.setObjectProperty(entry.getKey(), entry.getValue());
+        }
 
-            // store the byte array in a ObjectMessage so we can use a
-            // SharedByteArrayInputStream later
-            // without the need of copy the day
-            message.setObject(out.toByteArray());
+        long size = mail.getMessageSize();
+        ByteArrayOutputStream out;
+        if (size > -1) {
+            out = new ByteArrayOutputStream((int) size);
+        } else {
+            out = new ByteArrayOutputStream();
+        }
+        mail.getMessage().writeTo(out);
 
-            producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, 
Message.DEFAULT_TIME_TO_LIVE);
+        // store the byte array in a ObjectMessage so we can use a
+        // SharedByteArrayInputStream later
+        // without the need of copy the day
+        message.setObject(out.toByteArray());
 
-        } finally {
-            closeProducer(producer);
-        }
+        producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, 
Message.DEFAULT_TIME_TO_LIVE);
     }
 
     protected Map<String, Object> getJMSProperties(Mail mail, long 
nextDelivery) throws MessagingException {
@@ -485,13 +484,9 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
     @SuppressWarnings("unchecked")
     @Override
     public long getSize() throws MailQueueException {
-        Session session = null;
         QueueBrowser browser = null;
         int size = 0;
         try {
-            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue(queueName);
-
             browser = session.createBrowser(queue);
 
             Enumeration<Message> messages = browser.getEnumeration();
@@ -506,7 +501,6 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
             throw new MailQueueException("Unable to get size of queue " + 
queueName, e);
         } finally {
             closeBrowser(browser);
-            closeSession(session);
         }
     }
 
@@ -649,17 +643,11 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
     @Override
     @SuppressWarnings("unchecked")
     public MailQueueIterator browse() throws MailQueueException {
-        Session session = null;
         QueueBrowser browser = null;
         try {
-            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue(queueName);
-
             browser = session.createBrowser(queue);
 
             final Enumeration<Message> messages = browser.getEnumeration();
-
-            final Session mySession = session;
             final QueueBrowser myBrowser = browser;
 
             return new MailQueueIterator() {
@@ -696,14 +684,11 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
                 @Override
                 public void close() {
                     closeBrowser(myBrowser);
-                    closeSession(mySession);
                 }
             };
 
         } catch (Exception e) {
-
             closeBrowser(browser);
-            closeSession(session);
 
             LOGGER.error("Unable to browse queue {}", queueName, e);
             throw new MailQueueException("Unable to browse queue " + 
queueName, e);
@@ -713,6 +698,8 @@ public class JMSMailQueue implements ManageableMailQueue, 
JMSSupport, MailPriori
     @Override
     public void dispose() {
         try {
+            closeProducer(producer);
+            closeSession(session);
             connection.close();
         } catch (JMSException e) {
             LOGGER.error("Error while closing session", e);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to