This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ef2890346dce94ad957310a1c9bcbee58404b041
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Nov 18 12:53:26 2020 +0700

    JAMES-3441 [REFACTORING] Use Optionals and encapsulating class for 
JamesMailSpooler mutable content
---
 .../mailetcontainer/impl/JamesMailSpooler.java     | 240 +++++++++++----------
 1 file changed, 132 insertions(+), 108 deletions(-)

diff --git 
a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
 
b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index b6ceddb..23623db 100644
--- 
a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ 
b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -63,6 +63,126 @@ import reactor.core.scheduler.Schedulers;
  * them from the spool when processing is complete.
  */
 public class JamesMailSpooler implements Disposable, Configurable, 
MailSpoolerMBean {
+    private static class Runner {
+        private final AtomicInteger processingActive = new AtomicInteger(0);
+        private final MetricFactory metricFactory;
+        private final MailProcessor mailProcessor;
+        private final MailRepository errorRepository;
+        private final MailRepositoryUrl errorRepositoryURL;
+        private final reactor.core.Disposable disposable;
+        private final MailQueue queue;
+        private final int concurrencyLevel;
+
+        private Runner(MetricFactory metricFactory, MailProcessor 
mailProcessor, MailRepository errorRepository, MailRepositoryUrl 
errorRepositoryURL, MailQueue queue, int concurrencyLevel) {
+            this.metricFactory = metricFactory;
+            this.mailProcessor = mailProcessor;
+            this.errorRepository = errorRepository;
+            this.errorRepositoryURL = errorRepositoryURL;
+            this.disposable = run(queue);
+            this.queue = queue;
+            this.concurrencyLevel = concurrencyLevel;
+        }
+
+        private reactor.core.Disposable run(MailQueue queue) {
+            return Flux.from(queue.deQueue())
+                .flatMap(item -> 
handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel)
+                .onErrorContinue((throwable, item) -> LOGGER.error("Exception 
processing mail while spooling {}", item, throwable))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+        }
+
+        private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) {
+            TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
+            return Mono.fromCallable(processingActive::incrementAndGet)
+                .flatMap(ignore -> processMail(queueItem))
+                .doOnSuccess(any -> 
timeMetric.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD))
+                .doOnTerminate(processingActive::decrementAndGet);
+        }
+
+        private Mono<Void> processMail(MailQueueItem queueItem) {
+            return Mono
+                .using(
+                    queueItem::getMail,
+                    mail -> Mono.fromRunnable(() -> 
performProcessMail(queueItem, mail)),
+                    LifecycleUtil::dispose);
+        }
+
+        private void performProcessMail(MailQueueItem queueItem, Mail mail) {
+            LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
+            try {
+                mailProcessor.service(mail);
+
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new InterruptedException("Thread has been 
interrupted");
+                }
+                queueItem.done(true);
+            } catch (Exception e) {
+                handleError(queueItem, mail, e);
+            } finally {
+                LOGGER.debug("==== End processing mail {} ====", 
mail.getName());
+            }
+        }
+
+        private void handleError(MailQueueItem queueItem, Mail mail, Exception 
processingException) {
+            int failureCount = computeFailureCount(mail);
+
+            try {
+                if (failureCount > MAXIMUM_FAILURE_COUNT) {
+                    LOGGER.error("Failed {} processing {} consecutive times. 
Abort. Mail is saved in {}", mail.getName(), failureCount, 
errorRepositoryURL.asString());
+                    storeInErrorRepository(queueItem);
+                } else {
+                    LOGGER.error("Failed {} processing {} consecutive times. 
Mail is requeued with increased failure count.", mail.getName(), failureCount, 
processingException);
+                    reEnqueue(queueItem, failureCount);
+                }
+            } catch (Exception nestedE) {
+                LOGGER.error("Could not apply standard error handling for {}, 
defaulting to nack", mail.getName(), nestedE);
+                nack(queueItem, processingException);
+            }
+        }
+
+        private int computeFailureCount(Mail mail) {
+            Integer previousFailureCount = 
mail.getAttribute(MAIL_PROCESSING_ERROR_COUNT)
+                .flatMap(attribute -> 
attribute.getValue().valueAs(Integer.class))
+                .orElse(0);
+            return previousFailureCount + 1;
+        }
+
+        private void reEnqueue(MailQueueItem queueItem, int failureCount) 
throws MailQueue.MailQueueException {
+            Mail mail = queueItem.getMail();
+            mail.setAttribute(new Attribute(MAIL_PROCESSING_ERROR_COUNT, 
AttributeValue.of(failureCount)));
+            queue.enQueue(mail);
+            queueItem.done(true);
+        }
+
+        private void storeInErrorRepository(MailQueueItem queueItem) throws 
MessagingException {
+            errorRepository.store(queueItem.getMail());
+            queueItem.done(true);
+        }
+
+        private void nack(MailQueueItem queueItem, Exception 
processingException) {
+            try {
+                queueItem.done(false);
+            } catch (MailQueue.MailQueueException ex) {
+                throw new RuntimeException(processingException);
+            }
+        }
+
+        public void dispose() {
+            LOGGER.info("start dispose() ...");
+            disposable.dispose();
+            try {
+                queue.close();
+            } catch (IOException e) {
+                LOGGER.debug("error closing queue", e);
+            }
+            LOGGER.info("thread shutdown completed.");
+        }
+
+        public int getCurrentSpoolCount() {
+            return processingActive.get();
+        }
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JamesMailSpooler.class);
 
     public static final String SPOOL_PROCESSING = "spoolProcessing";
@@ -74,23 +194,14 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
      * concurrency level to use for dequeuing mails from spool, allows to 
throttle resources dedicated to that async
      * process.
      */
-    private int concurrencyLevel;
-
-    private final AtomicInteger processingActive = new AtomicInteger(0);
-
     private final MetricFactory metricFactory;
-
-    /**
-     * The mail processor
-     */
     private final MailProcessor mailProcessor;
     private final MailRepositoryStore mailRepositoryStore;
-
     private final MailQueueFactory<?> queueFactory;
+
+    private int concurrencyLevel;
     private MailRepositoryUrl errorRepositoryURL;
-    private MailRepository errorRepository;
-    private reactor.core.Disposable disposable;
-    private MailQueue queue;
+    private Optional<Runner> runner;
 
     @Inject
     public JamesMailSpooler(MetricFactory metricFactory, MailProcessor 
mailProcessor, MailRepositoryStore mailRepositoryStore, MailQueueFactory<?> 
queueFactory) {
@@ -118,100 +229,20 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
     public void init() {
         LOGGER.info("init...");
         LOGGER.info("Concurrency level is {}", concurrencyLevel);
-        queue = queueFactory.createQueue(MailQueueFactory.SPOOL, 
MailQueueFactory.prefetchCount(concurrencyLevel));
-        disposable = run(queue);
+        MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL, 
MailQueueFactory.prefetchCount(concurrencyLevel));
+        runner = Optional.of(new Runner(metricFactory,
+            mailProcessor, errorRepository(), errorRepositoryURL, queue, 
concurrencyLevel));
         LOGGER.info("Spooler started");
-        try {
-            this.errorRepository = 
mailRepositoryStore.select(errorRepositoryURL);
-        } catch (MailRepositoryStore.MailRepositoryStoreException e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    private reactor.core.Disposable run(MailQueue queue) {
-        return Flux.from(queue.deQueue())
-            .flatMap(item -> 
handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel)
-            .onErrorContinue((throwable, item) -> LOGGER.error("Exception 
processing mail while spooling {}", item, throwable))
-            .subscribeOn(Schedulers.elastic())
-            .subscribe();
-    }
-
-    private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) {
-        TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
-        return Mono.fromCallable(processingActive::incrementAndGet)
-            .flatMap(ignore -> processMail(queueItem))
-            .doOnSuccess(any -> 
timeMetric.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD))
-            .doOnTerminate(processingActive::decrementAndGet);
-    }
-
-    private Mono<Void> processMail(MailQueueItem queueItem) {
-        return Mono
-            .using(
-                queueItem::getMail,
-                mail -> Mono.fromRunnable(() -> performProcessMail(queueItem, 
mail)),
-                LifecycleUtil::dispose);
-    }
-
-    private void performProcessMail(MailQueueItem queueItem, Mail mail) {
-        LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
-        try {
-            mailProcessor.service(mail);
-
-            if (Thread.currentThread().isInterrupted()) {
-                throw new InterruptedException("Thread has been interrupted");
-            }
-            queueItem.done(true);
-        } catch (Exception e) {
-            handleError(queueItem, mail, e);
-        } finally {
-            LOGGER.debug("==== End processing mail {} ====", mail.getName());
-        }
     }
 
-    private void handleError(MailQueueItem queueItem, Mail mail, Exception 
processingException) {
-        int failureCount = computeFailureCount(mail);
-
+    private MailRepository errorRepository() {
         try {
-            if (failureCount > MAXIMUM_FAILURE_COUNT) {
-                LOGGER.error("Failed {} processing {} consecutive times. 
Abort. Mail is saved in {}", mail.getName(), failureCount, 
errorRepositoryURL.asString());
-                storeInErrorRepository(queueItem);
-            } else {
-                LOGGER.error("Failed {} processing {} consecutive times. Mail 
is requeued with increased failure count.", mail.getName(), failureCount, 
processingException);
-                reEnqueue(queueItem, failureCount);
-            }
-        } catch (Exception nestedE) {
-            LOGGER.error("Could not apply standard error handling for {}, 
defaulting to nack", mail.getName(), nestedE);
-            nack(queueItem, processingException);
+            return mailRepositoryStore.select(errorRepositoryURL);
+        } catch (MailRepositoryStore.MailRepositoryStoreException e) {
+            throw new RuntimeException(e);
         }
     }
 
-    private int computeFailureCount(Mail mail) {
-        Integer previousFailureCount = 
mail.getAttribute(MAIL_PROCESSING_ERROR_COUNT)
-            .flatMap(attribute -> attribute.getValue().valueAs(Integer.class))
-            .orElse(0);
-        return previousFailureCount + 1;
-    }
-
-    private void reEnqueue(MailQueueItem queueItem, int failureCount) throws 
MailQueue.MailQueueException {
-        Mail mail = queueItem.getMail();
-        mail.setAttribute(new Attribute(MAIL_PROCESSING_ERROR_COUNT, 
AttributeValue.of(failureCount)));
-        queue.enQueue(mail);
-        queueItem.done(true);
-    }
-
-    private void storeInErrorRepository(MailQueueItem queueItem) throws 
MessagingException {
-        errorRepository.store(queueItem.getMail());
-        queueItem.done(true);
-    }
-
-    private void nack(MailQueueItem queueItem, Exception processingException) {
-        try {
-            queueItem.done(false);
-        } catch (MailQueue.MailQueueException ex) {
-            throw new RuntimeException(processingException);
-        }
-    }
 
     /**
      * The dispose operation is called at the end of a components lifecycle.
@@ -224,14 +255,7 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
     @PreDestroy
     @Override
     public void dispose() {
-        LOGGER.info("start dispose() ...");
-        disposable.dispose();
-        try {
-            queue.close();
-        } catch (IOException e) {
-            LOGGER.debug("error closing queue", e);
-        }
-        LOGGER.info("thread shutdown completed.");
+        runner.ifPresent(Runner::dispose);
     }
 
     @Override
@@ -241,6 +265,6 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
 
     @Override
     public int getCurrentSpoolCount() {
-        return processingActive.get();
+        return runner.map(Runner::getCurrentSpoolCount).orElse(0);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to