This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f7cc352d7a189ad10d0dcdb71c8d6aded07bb439
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Nov 18 13:06:37 2020 +0700

    JAMES-3441 [Refactoring] Strong type & extract JamesMailSpooler 
configuration
    
    This ease injection overrides for this configuration thus ease testing
---
 .../modules/server/CamelMailetContainerModule.java | 12 ++-
 .../mailetcontainer/impl/JamesMailSpooler.java     | 91 ++++++++++++++++------
 2 files changed, 76 insertions(+), 27 deletions(-)

diff --git 
a/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java
 
b/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java
index 68c7d06..0364045 100644
--- 
a/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java
+++ 
b/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java
@@ -45,6 +45,7 @@ import 
org.apache.james.mailetcontainer.impl.JamesMailetContext;
 import org.apache.james.mailetcontainer.impl.MatcherMailetPair;
 import org.apache.james.mailetcontainer.impl.camel.CamelCompositeProcessor;
 import org.apache.james.mailetcontainer.impl.camel.CamelMailetProcessor;
+import org.apache.james.mailrepository.api.MailRepositoryStore;
 import org.apache.james.server.core.configuration.ConfigurationProvider;
 import org.apache.james.transport.mailets.RemoveMimeHeader;
 import org.apache.james.transport.matchers.All;
@@ -123,12 +124,19 @@ public class CamelMailetContainerModule extends 
AbstractModule {
         return camelContext;
     }
 
+    @Provides
+    @Singleton
+    public JamesMailSpooler.Configuration 
spoolerConfiguration(MailRepositoryStore mailRepositoryStore, 
ConfigurationProvider configurationProvider) {
+        HierarchicalConfiguration<ImmutableNode> conf = 
getJamesSpoolerConfiguration(configurationProvider);
+        return JamesMailSpooler.Configuration.from(mailRepositoryStore, conf);
+    }
+
     @ProvidesIntoSet
-    InitializationOperation startSpooler(JamesMailSpooler jamesMailSpooler, 
ConfigurationProvider configurationProvider) {
+    InitializationOperation startSpooler(JamesMailSpooler jamesMailSpooler, 
JamesMailSpooler.Configuration configuration) {
         return InitilizationOperationBuilder
             .forClass(JamesMailSpooler.class)
             .init(() -> {
-                
jamesMailSpooler.configure(getJamesSpoolerConfiguration(configurationProvider));
+                jamesMailSpooler.configure(configuration);
                 jamesMailSpooler.init();
             });
     }
diff --git 
a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
 
b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 6244e4c..cb15deb 100644
--- 
a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ 
b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -53,6 +53,7 @@ import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
@@ -70,24 +71,24 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
         private final MetricFactory metricFactory;
         private final MailProcessor mailProcessor;
         private final MailRepository errorRepository;
-        private final MailRepositoryUrl errorRepositoryURL;
         private final reactor.core.Disposable disposable;
         private final MailQueue queue;
-        private final int concurrencyLevel;
+        private final Configuration configuration;
 
-        private Runner(MetricFactory metricFactory, MailProcessor 
mailProcessor, MailRepository errorRepository, MailRepositoryUrl 
errorRepositoryURL, MailQueue queue, int concurrencyLevel) {
+        private Runner(MetricFactory metricFactory, MailProcessor 
mailProcessor,
+                       MailRepository errorRepository, MailQueue queue, 
Configuration configuration) {
             this.metricFactory = metricFactory;
             this.mailProcessor = mailProcessor;
             this.errorRepository = errorRepository;
-            this.errorRepositoryURL = errorRepositoryURL;
-            this.disposable = run(queue);
             this.queue = queue;
-            this.concurrencyLevel = concurrencyLevel;
+            this.configuration = configuration;
+
+            this.disposable = run(queue);
         }
 
         private reactor.core.Disposable run(MailQueue queue) {
             return Flux.from(queue.deQueue())
-                .flatMap(item -> 
handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel)
+                .flatMap(item -> 
handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), 
configuration.getConcurrencyLevel())
                 .onErrorContinue((throwable, item) -> LOGGER.error("Exception 
processing mail while spooling {}", item, throwable))
                 .subscribeOn(Schedulers.elastic())
                 .subscribe();
@@ -130,7 +131,7 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
 
             try {
                 if (failureCount > MAXIMUM_FAILURE_COUNT) {
-                    LOGGER.error("Failed {} processing {} consecutive times. 
Abort. Mail is saved in {}", mail.getName(), failureCount, 
errorRepositoryURL.asString());
+                    LOGGER.error("Failed {} processing {} consecutive times. 
Abort. Mail is saved in {}", mail.getName(), failureCount, 
configuration.getErrorRepositoryURL().asString());
                     storeInErrorRepository(queueItem);
                 } else {
                     LOGGER.error("Failed {} processing {} consecutive times. 
Mail is requeued with increased failure count.", mail.getName(), failureCount, 
processingException);
@@ -185,6 +186,50 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
         }
     }
 
+    public static class Configuration {
+        public static Configuration from(MailRepositoryStore 
mailRepositoryStore, HierarchicalConfiguration<ImmutableNode> config) {
+            int concurrencyLevel = config.getInt("threads", 100);
+            MailRepositoryUrl errorRepositoryURL = 
Optional.ofNullable(config.getString("errorRepository", null))
+                .map(MailRepositoryUrl::from)
+                .orElseGet(() -> MailRepositoryUrl.fromPathAndProtocol(
+                    mailRepositoryStore.defaultProtocol()
+                        .orElseThrow(() -> new IllegalStateException("Cannot 
retrieve mailRepository URL, you need to configure an `errorRepository` 
property for the spooler.0")),
+                    ERROR_REPOSITORY_PATH));
+
+            return new Configuration(concurrencyLevel, errorRepositoryURL);
+        }
+
+        private final int concurrencyLevel;
+        private final MailRepositoryUrl errorRepositoryURL;
+
+        public Configuration(int concurrencyLevel, MailRepositoryUrl 
errorRepositoryURL) {
+            Preconditions.checkArgument(concurrencyLevel >= 0, "'threads' 
needs to be greater than or equal to zero");
+            
+            this.concurrencyLevel = concurrencyLevel;
+            this.errorRepositoryURL = errorRepositoryURL;
+        }
+
+        public int getConcurrencyLevel() {
+            return concurrencyLevel;
+        }
+
+        public boolean isEnabled() {
+            return concurrencyLevel > 0;
+        }
+
+        public MailRepositoryUrl getErrorRepositoryURL() {
+            return errorRepositoryURL;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("concurrencyLevel", concurrencyLevel)
+                .add("errorRepositoryURL", errorRepositoryURL)
+                .toString();
+        }
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JamesMailSpooler.class);
 
     public static final String SPOOL_PROCESSING = "spoolProcessing";
@@ -201,8 +246,7 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
     private final MailRepositoryStore mailRepositoryStore;
     private final MailQueueFactory<?> queueFactory;
 
-    private int concurrencyLevel;
-    private MailRepositoryUrl errorRepositoryURL;
+    private Configuration configuration;
     private Optional<Runner> runner;
 
     @Inject
@@ -215,13 +259,11 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
 
     @Override
     public void configure(HierarchicalConfiguration<ImmutableNode> config) {
-        concurrencyLevel = config.getInt("threads", 100);
-        errorRepositoryURL = 
Optional.ofNullable(config.getString("errorRepository", null))
-            .map(MailRepositoryUrl::from)
-            .orElseGet(() -> MailRepositoryUrl.fromPathAndProtocol(
-                mailRepositoryStore.defaultProtocol()
-                    .orElseThrow(() -> new IllegalStateException("Cannot 
retrieve mailRepository URL, you need to configure an `errorRepository` 
property for the spooler.0")),
-                ERROR_REPOSITORY_PATH));
+        configure(Configuration.from(mailRepositoryStore, config));
+    }
+
+    public void configure(Configuration configuration) {
+        this.configuration = configuration;
     }
 
     /**
@@ -229,22 +271,21 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
      */
     @PostConstruct
     public void init() {
-        Preconditions.checkArgument(concurrencyLevel >= 0, "'threads' needs to 
be greater than or equal to zero");
-        if (concurrencyLevel > 0) {
+        if (configuration.isEnabled()) {
             LOGGER.info("init...");
-            LOGGER.info("Concurrency level is {}", concurrencyLevel);
-            MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL, 
MailQueueFactory.prefetchCount(concurrencyLevel));
+            LOGGER.info("Concurrency level is {}", 
configuration.getConcurrencyLevel());
+            MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL, 
MailQueueFactory.prefetchCount(configuration.getConcurrencyLevel()));
             runner = Optional.of(new Runner(metricFactory,
-                mailProcessor, errorRepository(), errorRepositoryURL, queue, 
concurrencyLevel));
+                mailProcessor, errorRepository(), queue, configuration));
             LOGGER.info("Spooler started");
         } else {
-            LOGGER.info("Spooler had been dis-activated. To enable is set 
'threads' count to a value greater than zero");
+            LOGGER.info("Spooler had been deactivated. To enable it set 
'threads' count to a value greater than zero");
         }
     }
 
     private MailRepository errorRepository() {
         try {
-            return mailRepositoryStore.select(errorRepositoryURL);
+            return 
mailRepositoryStore.select(configuration.getErrorRepositoryURL());
         } catch (MailRepositoryStore.MailRepositoryStoreException e) {
             throw new RuntimeException(e);
         }
@@ -267,7 +308,7 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
 
     @Override
     public int getThreadCount() {
-        return concurrencyLevel;
+        return configuration.getConcurrencyLevel();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to