JAMES-2294 Introduce reprocessing tasks

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e6fdeae7
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e6fdeae7
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e6fdeae7

Branch: refs/heads/master
Commit: e6fdeae70fc38390289f70726084dcce3adfa872
Parents: fac7e59
Author: benwa <[email protected]>
Authored: Thu Jan 25 10:35:24 2018 +0700
Committer: benwa <[email protected]>
Committed: Fri Jan 26 08:12:05 2018 +0700

----------------------------------------------------------------------
 .../webadmin/webadmin-mailrepository/pom.xml    |   9 ++
 .../service/MailRepositoryStoreService.java     |   2 +-
 .../service/ReprocessingAllMailsTask.java       | 115 +++++++++++++++++++
 .../service/ReprocessingOneMailTask.java        | 102 ++++++++++++++++
 .../webadmin/service/ReprocessingService.java   |  76 ++++++++++++
 5 files changed, 303 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/pom.xml
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-mailrepository/pom.xml 
b/server/protocols/webadmin/webadmin-mailrepository/pom.xml
index 367ea83..72b374a 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/pom.xml
+++ b/server/protocols/webadmin/webadmin-mailrepository/pom.xml
@@ -54,6 +54,15 @@
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>james-server-queue-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>james-server-queue-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>james-server-util-java8</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java
----------------------------------------------------------------------
diff --git 
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java
 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java
index 0f7c752..7df346b 100644
--- 
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java
+++ 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java
@@ -92,7 +92,7 @@ public class MailRepositoryStoreService {
         return new ClearMailRepositoryTask(getRepository(url), url);
     }
 
-    private MailRepository getRepository(String url) throws 
MailRepositoryStore.MailRepositoryStoreException {
+    public MailRepository getRepository(String url) throws 
MailRepositoryStore.MailRepositoryStoreException {
         return mailRepositoryStore.get(url)
             .orElseThrow(() -> ErrorResponder.builder()
                 .statusCode(HttpStatus.NOT_FOUND_404)

http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
----------------------------------------------------------------------
diff --git 
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
new file mode 100644
index 0000000..6f75e2e
--- /dev/null
+++ 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
@@ -0,0 +1,115 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.mail.MessagingException;
+
+import org.apache.james.mailrepository.api.MailRepositoryStore;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+public class ReprocessingAllMailsTask implements Task {
+
+    public static final String TYPE = "reprocessingAllTask";
+
+    public static class AdditionalInformation implements 
TaskExecutionDetails.AdditionalInformation {
+        private final String repositoryUrl;
+        private final String targetQueue;
+        private final Optional<String> targetProcessor;
+        private final long initialCount;
+        private final AtomicLong processedCount;
+
+        public AdditionalInformation(String repositoryUrl, String targetQueue, 
Optional<String> targetProcessor, long initialCount) {
+            this.repositoryUrl = repositoryUrl;
+            this.targetQueue = targetQueue;
+            this.targetProcessor = targetProcessor;
+            this.initialCount = initialCount;
+            this.processedCount = new AtomicLong(0);
+        }
+
+        public String getTargetQueue() {
+            return targetQueue;
+        }
+
+        public Optional<String> getTargetProcessor() {
+            return targetProcessor;
+        }
+
+        public String getRepositoryUrl() {
+            return repositoryUrl;
+        }
+
+        public long getRemainingCount() {
+            return initialCount - processedCount.get();
+        }
+
+        public long getInitialCount() {
+            return initialCount;
+        }
+
+        @JsonIgnore
+        public void notifyProgress(String key) {
+            processedCount.incrementAndGet();
+        }
+    }
+
+    private final ReprocessingService reprocessingService;
+    private final String repositoryUrl;
+    private final String targetQueue;
+    private final Optional<String> targetProcessor;
+    private final AdditionalInformation additionalInformation;
+
+    public ReprocessingAllMailsTask(ReprocessingService reprocessingService, 
long repositorySize,
+                                    String repositoryUrl, String targetQueue, 
Optional<String> targetProcessor) {
+        this.reprocessingService = reprocessingService;
+        this.repositoryUrl = repositoryUrl;
+        this.targetQueue = targetQueue;
+        this.targetProcessor = targetProcessor;
+        this.additionalInformation = new AdditionalInformation(
+            repositoryUrl, targetQueue, targetProcessor, repositorySize);
+    }
+
+    @Override
+    public Result run() {
+        try {
+            reprocessingService.reprocessAll(repositoryUrl, targetProcessor, 
targetQueue, additionalInformation::notifyProgress);
+            return Result.COMPLETED;
+        } catch (MessagingException | 
MailRepositoryStore.MailRepositoryStoreException e) {
+            LOGGER.error("Encountered error while reprocessing repository", e);
+            return Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public String type() {
+        return TYPE;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(additionalInformation);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
----------------------------------------------------------------------
diff --git 
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
new file mode 100644
index 0000000..d476bfc
--- /dev/null
+++ 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.util.Optional;
+
+import javax.mail.MessagingException;
+
+import org.apache.james.mailrepository.api.MailRepositoryStore;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+
+public class ReprocessingOneMailTask implements Task {
+
+    public static final String TYPE = "reprocessingOneTask";
+
+    public static class AdditionalInformation implements 
TaskExecutionDetails.AdditionalInformation {
+        private final String repositoryUrl;
+        private final String targetQueue;
+        private final String mailKey;
+        private final Optional<String> targetProcessor;
+
+        public AdditionalInformation(String repositoryUrl, String targetQueue, 
String mailKey, Optional<String> targetProcessor) {
+            this.repositoryUrl = repositoryUrl;
+            this.targetQueue = targetQueue;
+            this.mailKey = mailKey;
+            this.targetProcessor = targetProcessor;
+        }
+
+        public String getMailKey() {
+            return mailKey;
+        }
+
+        public String getTargetQueue() {
+            return targetQueue;
+        }
+
+        public Optional<String> getTargetProcessor() {
+            return targetProcessor;
+        }
+
+        public String getRepositoryUrl() {
+            return repositoryUrl;
+        }
+    }
+
+    private final ReprocessingService reprocessingService;
+    private final String repositoryUrl;
+    private final String targetQueue;
+    private final String mailKey;
+    private final Optional<String> targetProcessor;
+    private final AdditionalInformation additionalInformation;
+
+    public ReprocessingOneMailTask(ReprocessingService reprocessingService,
+                                   String repositoryUrl, String targetQueue, 
String mailKey, Optional<String> targetProcessor) {
+        this.reprocessingService = reprocessingService;
+        this.repositoryUrl = repositoryUrl;
+        this.targetQueue = targetQueue;
+        this.mailKey = mailKey;
+        this.targetProcessor = targetProcessor;
+        this.additionalInformation = new AdditionalInformation(repositoryUrl, 
targetQueue, mailKey, targetProcessor);
+    }
+
+    @Override
+    public Result run() {
+        try {
+            reprocessingService.reprocess(repositoryUrl, mailKey, 
targetProcessor, targetQueue);
+            return Result.COMPLETED;
+        } catch (MessagingException | 
MailRepositoryStore.MailRepositoryStoreException e) {
+            LOGGER.error("Encountered error while reprocessing repository", e);
+            return Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public String type() {
+        return TYPE;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(additionalInformation);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
----------------------------------------------------------------------
diff --git 
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
new file mode 100644
index 0000000..8eee51b
--- /dev/null
+++ 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -0,0 +1,76 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import javax.inject.Inject;
+import javax.mail.MessagingException;
+
+import org.apache.james.mailrepository.api.MailRepository;
+import org.apache.james.mailrepository.api.MailRepositoryStore;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.util.streams.Iterators;
+import org.apache.mailet.Mail;
+
+import com.github.fge.lambdas.Throwing;
+
+public class ReprocessingService {
+
+    private final MailQueueFactory<?> mailQueueFactory;
+    private final MailRepositoryStoreService mailRepositoryStoreService;
+
+    @Inject
+    public ReprocessingService(MailQueueFactory<?> mailQueueFactory,
+                               MailRepositoryStoreService 
mailRepositoryStoreService) {
+        this.mailQueueFactory = mailQueueFactory;
+        this.mailRepositoryStoreService = mailRepositoryStoreService;
+    }
+
+    public void reprocessAll(String url, Optional<String> targetProcessor, 
String targetQueue, Consumer<String> keyListener) throws 
MailRepositoryStore.MailRepositoryStoreException, MessagingException {
+        MailRepository repository = 
mailRepositoryStoreService.getRepository(url);
+        MailQueue mailQueue = getMailQueue(targetQueue);
+
+        Iterators.toStream(repository.list())
+            .peek(keyListener)
+            .forEach(Throwing.consumer(key -> reprocess(repository, mailQueue, 
key, targetProcessor)));
+    }
+
+    public void reprocess(String url, String key, Optional<String> 
targetProcessor, String targetQueue) throws 
MailRepositoryStore.MailRepositoryStoreException, MessagingException {
+        MailRepository repository = 
mailRepositoryStoreService.getRepository(url);
+        MailQueue mailQueue = getMailQueue(targetQueue);
+
+        reprocess(repository, mailQueue, key, targetProcessor);
+    }
+
+    private void reprocess(MailRepository repository, MailQueue mailQueue, 
String key, Optional<String> targetProcessor) throws MessagingException {
+        Mail mail = repository.retrieve(key);
+        targetProcessor.ifPresent(mail::setState);
+        mailQueue.enQueue(mail);
+        repository.remove(key);
+    }
+
+    private MailQueue getMailQueue(String targetQueue) {
+        return mailQueueFactory.getQueue(targetQueue)
+            .orElseThrow(() -> new RuntimeException("Can not find queue " + 
targetQueue));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to