This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.7.x by this push:
     new 70c7ad2a5f JAMES-3891 ActiveMQCacheableMailQueue: discard emails not 
backed by a blob (#1464)
70c7ad2a5f is described below

commit 70c7ad2a5fbb31bc1f31cf734624d80a171cc265
Author: Benoit TELLIER <[email protected]>
AuthorDate: Sat Feb 25 21:14:12 2023 +0700

    JAMES-3891 ActiveMQCacheableMailQueue: discard emails not backed by a blob 
(#1464)
---
 .../queue/activemq/ActiveMQCacheableMailQueue.java | 27 ++++++++++++++++++----
 .../james/queue/jms/JMSCacheableMailQueue.java     |  7 +++---
 2 files changed, 26 insertions(+), 8 deletions(-)

diff --git 
a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
 
b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
index 01596cb926..6660a2a7db 100644
--- 
a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
+++ 
b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.queue.activemq;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.List;
@@ -56,6 +57,8 @@ import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Mono;
+
 /**
  * <p>
  * {@link MailQueue} implementation which use an ActiveMQ Queue.
@@ -226,10 +229,26 @@ public class ActiveMQCacheableMailQueue extends 
JMSCacheableMailQueue implements
     }
 
     @Override
-    protected MailQueueItem createMailQueueItem(Session session, 
MessageConsumer consumer, Message message) throws JMSException, 
MessagingException {
-        Mail mail = createMail(message);
-        ActiveMQMailQueueItem activeMQMailQueueItem = new 
ActiveMQMailQueueItem(mail, session, consumer, message);
-        return mailQueueItemDecoratorFactory.decorate(activeMQMailQueueItem, 
queueName);
+    protected Mono<MailQueueItem> createMailQueueItem(Session session, 
MessageConsumer consumer, Message message) throws JMSException, 
MessagingException {
+        try {
+            Mail mail = createMail(message);
+            ActiveMQMailQueueItem activeMQMailQueueItem = new 
ActiveMQMailQueueItem(mail, session, consumer, message);
+            return 
Mono.just(mailQueueItemDecoratorFactory.decorate(activeMQMailQueueItem, 
queueName));
+        } catch (MessagingException e) {
+            if (e.getCause() instanceof FileNotFoundException) {
+                LOGGER.warn("Blob message cannot be found, discarding email", 
e);
+                try {
+                    session.commit();
+                } catch (JMSException ex) {
+                    throw new MailQueueException("Unable to commit dequeue 
operation for mail", ex);
+                } finally {
+                    JMSCacheableMailQueue.closeConsumer(consumer);
+                    JMSCacheableMailQueue.closeSession(session);
+                }
+                return Mono.empty();
+            }
+            return Mono.error(e);
+        }
     }
 
     @Override
diff --git 
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
 
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 90d60e01c6..9ec0ff852e 100644
--- 
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
+++ 
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -246,7 +246,7 @@ public class JMSCacheableMailQueue implements 
ManageableMailQueue, JMSSupport, M
 
             if (message != null) {
                 dequeuedMailsMetric.increment();
-                return Mono.just(createMailQueueItem(session, consumer, 
message));
+                return createMailQueueItem(session, consumer, message);
             } else {
                 session.commit();
                 closeConsumer(consumer);
@@ -405,7 +405,6 @@ public class JMSCacheableMailQueue implements 
ManageableMailQueue, JMSSupport, M
         } else {
             throw new MailQueueException("Not supported JMS Message received " 
+ message);
         }
-
     }
 
     /**
@@ -509,10 +508,10 @@ public class JMSCacheableMailQueue implements 
ManageableMailQueue, JMSSupport, M
      * @throws JMSException
      * @throws MessagingException
      */
-    protected MailQueueItem createMailQueueItem(Session session, 
MessageConsumer consumer, Message message) throws JMSException, 
MessagingException {
+    protected Mono<MailQueueItem> createMailQueueItem(Session session, 
MessageConsumer consumer, Message message) throws JMSException, 
MessagingException {
         Mail mail = createMail(message);
         JMSMailQueueItem jmsMailQueueItem = new JMSMailQueueItem(mail, 
session, consumer);
-        return mailQueueItemDecoratorFactory.decorate(jmsMailQueueItem, 
queueName);
+        return 
Mono.just(mailQueueItemDecoratorFactory.decorate(jmsMailQueueItem, queueName));
     }
 
     protected String getMessageSelector() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to