JAMES-2332 Re-use non-transacted sessions in JMSMailQueue and its sub-classes.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b8229df3 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b8229df3 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b8229df3 Branch: refs/heads/master Commit: b8229df3b09c4ca59aa23a8f2a3f65c20e4ab1f4 Parents: 16d7104 Author: benwa <btell...@linagora.com> Authored: Tue Feb 6 10:32:26 2018 +0700 Committer: benwa <btell...@linagora.com> Committed: Tue Feb 6 10:38:15 2018 +0700 ---------------------------------------------------------------------- .../james/queue/activemq/ActiveMQMailQueue.java | 22 +----- .../apache/james/queue/jms/JMSMailQueue.java | 83 +++++++++----------- 2 files changed, 39 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b8229df3/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java index 142b77b..e968196 100644 --- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java +++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java @@ -141,8 +141,8 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport { /** * Produce the mail to the JMS Queue */ - protected void produceMail(Session session, Map<String, Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException { - MessageProducer producer = null; + @Override + protected void produceMail(Map<String, Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException { BlobMessage blobMessage = null; boolean reuse = false; @@ -189,28 +189,20 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport { // store the queue name in the props props.put(JAMES_QUEUE_NAME, queueName); - Queue queue = session.createQueue(queueName); - - producer = session.createProducer(queue); for (Map.Entry<String, Object> entry : props.entrySet()) { blobMessage.setObjectProperty(entry.getKey(), entry.getValue()); } producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE); - - } else { - super.produceMail(session, props, msgPrio, mail); + super.produceMail(props, msgPrio, mail); } } catch (JMSException e) { if (!reuse && blobMessage != null && blobMessage instanceof ActiveMQBlobMessage) { ((ActiveMQBlobMessage) blobMessage).deleteFile(); } throw e; - } finally { - closeProducer(producer); } - } /** @@ -272,15 +264,11 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport { */ @Override public long getSize() throws MailQueueException { - - Session session = null; MessageConsumer consumer = null; MessageProducer producer = null; TemporaryQueue replyTo = null; - long size; try { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); replyTo = session.createTemporaryQueue(); consumer = session.createConsumer(replyTo); @@ -296,8 +284,7 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport { MapMessage reply = (MapMessage) consumer.receive(2000); if (reply != null && reply.itemExists("size")) { try { - size = reply.getLong("size"); - return size; + return reply.getLong("size"); } catch (NumberFormatException e) { return super.getSize(); } @@ -321,7 +308,6 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport { LOGGER.error("Error while deleting temporary queue", e); } } - closeSession(session); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/b8229df3/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java index 01f23d5..c6d17d8 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java @@ -133,13 +133,18 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori private static final Logger LOGGER = LoggerFactory.getLogger(JMSMailQueue.class); + public static final String FORCE_DELIVERY = "FORCE_DELIVERY"; + protected final String queueName; protected final Connection connection; protected final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory; protected final Metric enqueuedMailsMetric; protected final Metric mailQueueSize; protected final MetricFactory metricFactory; - public static final String FORCE_DELIVERY = "FORCE_DELIVERY"; + + protected final Session session; + protected final Queue queue; + protected final MessageProducer producer; public JMSMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queueName, MetricFactory metricFactory) { try { @@ -153,6 +158,14 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori this.metricFactory = metricFactory; this.enqueuedMailsMetric = metricFactory.generate("enqueuedMail:" + queueName); this.mailQueueSize = metricFactory.generate("mailQueueSize:" + queueName); + + try { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(queueName); + producer = session.createProducer(queue); + } catch (JMSException e) { + throw Throwables.propagate(e); + } } @Override @@ -211,14 +224,11 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori @Override public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException { TimeMetric timeMetric = metricFactory.timer("enqueueMailTime:" + queueName); - Session session = null; long nextDeliveryTimestamp = computeNextDeliveryTimestamp(delay, unit); try { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - int msgPrio = NORMAL_PRIORITY; Object prio = mail.getAttribute(MAIL_PRIORITY); if (prio instanceof Integer) { @@ -227,7 +237,7 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori Map<String, Object> props = getJMSProperties(mail, nextDeliveryTimestamp); - produceMail(session, props, msgPrio, mail); + produceMail(props, msgPrio, mail); enqueuedMailsMetric.increment(); mailQueueSize.increment(); @@ -235,7 +245,6 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori throw new MailQueueException("Unable to enqueue mail " + mail, e); } finally { timeMetric.stopAndPublish(); - closeSession(session); } } @@ -257,38 +266,28 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori /** * Produce the mail to the JMS Queue */ - protected void produceMail(Session session, Map<String, Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException { - MessageProducer producer = null; - - try { - Queue queue = session.createQueue(queueName); - - producer = session.createProducer(queue); - ObjectMessage message = session.createObjectMessage(); - - for (Map.Entry<String, Object> entry : props.entrySet()) { - message.setObjectProperty(entry.getKey(), entry.getValue()); - } + protected void produceMail(Map<String, Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException { + ObjectMessage message = session.createObjectMessage(); - long size = mail.getMessageSize(); - ByteArrayOutputStream out; - if (size > -1) { - out = new ByteArrayOutputStream((int) size); - } else { - out = new ByteArrayOutputStream(); - } - mail.getMessage().writeTo(out); + for (Map.Entry<String, Object> entry : props.entrySet()) { + message.setObjectProperty(entry.getKey(), entry.getValue()); + } - // store the byte array in a ObjectMessage so we can use a - // SharedByteArrayInputStream later - // without the need of copy the day - message.setObject(out.toByteArray()); + long size = mail.getMessageSize(); + ByteArrayOutputStream out; + if (size > -1) { + out = new ByteArrayOutputStream((int) size); + } else { + out = new ByteArrayOutputStream(); + } + mail.getMessage().writeTo(out); - producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE); + // store the byte array in a ObjectMessage so we can use a + // SharedByteArrayInputStream later + // without the need of copy the day + message.setObject(out.toByteArray()); - } finally { - closeProducer(producer); - } + producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE); } protected Map<String, Object> getJMSProperties(Mail mail, long nextDelivery) throws MessagingException { @@ -485,13 +484,9 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori @SuppressWarnings("unchecked") @Override public long getSize() throws MailQueueException { - Session session = null; QueueBrowser browser = null; int size = 0; try { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(queueName); - browser = session.createBrowser(queue); Enumeration<Message> messages = browser.getEnumeration(); @@ -506,7 +501,6 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori throw new MailQueueException("Unable to get size of queue " + queueName, e); } finally { closeBrowser(browser); - closeSession(session); } } @@ -649,17 +643,11 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori @Override @SuppressWarnings("unchecked") public MailQueueIterator browse() throws MailQueueException { - Session session = null; QueueBrowser browser = null; try { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(queueName); - browser = session.createBrowser(queue); final Enumeration<Message> messages = browser.getEnumeration(); - - final Session mySession = session; final QueueBrowser myBrowser = browser; return new MailQueueIterator() { @@ -696,14 +684,11 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori @Override public void close() { closeBrowser(myBrowser); - closeSession(mySession); } }; } catch (Exception e) { - closeBrowser(browser); - closeSession(session); LOGGER.error("Unable to browse queue {}", queueName, e); throw new MailQueueException("Unable to browse queue " + queueName, e); @@ -713,6 +698,8 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori @Override public void dispose() { try { + closeProducer(producer); + closeSession(session); connection.close(); } catch (JMSException e) { LOGGER.error("Error while closing session", e); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org