JAMES-2294 Introduce reprocessing tasks
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e6fdeae7 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e6fdeae7 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e6fdeae7 Branch: refs/heads/master Commit: e6fdeae70fc38390289f70726084dcce3adfa872 Parents: fac7e59 Author: benwa <[email protected]> Authored: Thu Jan 25 10:35:24 2018 +0700 Committer: benwa <[email protected]> Committed: Fri Jan 26 08:12:05 2018 +0700 ---------------------------------------------------------------------- .../webadmin/webadmin-mailrepository/pom.xml | 9 ++ .../service/MailRepositoryStoreService.java | 2 +- .../service/ReprocessingAllMailsTask.java | 115 +++++++++++++++++++ .../service/ReprocessingOneMailTask.java | 102 ++++++++++++++++ .../webadmin/service/ReprocessingService.java | 76 ++++++++++++ 5 files changed, 303 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/pom.xml ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/pom.xml b/server/protocols/webadmin/webadmin-mailrepository/pom.xml index 367ea83..72b374a 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/pom.xml +++ b/server/protocols/webadmin/webadmin-mailrepository/pom.xml @@ -54,6 +54,15 @@ </dependency> <dependency> <groupId>${project.groupId}</groupId> + <artifactId>james-server-queue-api</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>james-server-queue-memory</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>james-server-util-java8</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java index 0f7c752..7df346b 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/MailRepositoryStoreService.java @@ -92,7 +92,7 @@ public class MailRepositoryStoreService { return new ClearMailRepositoryTask(getRepository(url), url); } - private MailRepository getRepository(String url) throws MailRepositoryStore.MailRepositoryStoreException { + public MailRepository getRepository(String url) throws MailRepositoryStore.MailRepositoryStoreException { return mailRepositoryStore.get(url) .orElseThrow(() -> ErrorResponder.builder() .statusCode(HttpStatus.NOT_FOUND_404) http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java new file mode 100644 index 0000000..6f75e2e --- /dev/null +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java @@ -0,0 +1,115 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.service; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import javax.mail.MessagingException; + +import org.apache.james.mailrepository.api.MailRepositoryStore; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +public class ReprocessingAllMailsTask implements Task { + + public static final String TYPE = "reprocessingAllTask"; + + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + private final String repositoryUrl; + private final String targetQueue; + private final Optional<String> targetProcessor; + private final long initialCount; + private final AtomicLong processedCount; + + public AdditionalInformation(String repositoryUrl, String targetQueue, Optional<String> targetProcessor, long initialCount) { + this.repositoryUrl = repositoryUrl; + this.targetQueue = targetQueue; + this.targetProcessor = targetProcessor; + this.initialCount = initialCount; + this.processedCount = new AtomicLong(0); + } + + public String getTargetQueue() { + return targetQueue; + } + + public Optional<String> getTargetProcessor() { + return targetProcessor; + } + + public String getRepositoryUrl() { + return repositoryUrl; + } + + public long getRemainingCount() { + return initialCount - processedCount.get(); + } + + public long getInitialCount() { + return initialCount; + } + + @JsonIgnore + public void notifyProgress(String key) { + processedCount.incrementAndGet(); + } + } + + private final ReprocessingService reprocessingService; + private final String repositoryUrl; + private final String targetQueue; + private final Optional<String> targetProcessor; + private final AdditionalInformation additionalInformation; + + public ReprocessingAllMailsTask(ReprocessingService reprocessingService, long repositorySize, + String repositoryUrl, String targetQueue, Optional<String> targetProcessor) { + this.reprocessingService = reprocessingService; + this.repositoryUrl = repositoryUrl; + this.targetQueue = targetQueue; + this.targetProcessor = targetProcessor; + this.additionalInformation = new AdditionalInformation( + repositoryUrl, targetQueue, targetProcessor, repositorySize); + } + + @Override + public Result run() { + try { + reprocessingService.reprocessAll(repositoryUrl, targetProcessor, targetQueue, additionalInformation::notifyProgress); + return Result.COMPLETED; + } catch (MessagingException | MailRepositoryStore.MailRepositoryStoreException e) { + LOGGER.error("Encountered error while reprocessing repository", e); + return Result.PARTIAL; + } + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(additionalInformation); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java new file mode 100644 index 0000000..d476bfc --- /dev/null +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java @@ -0,0 +1,102 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.service; + +import java.util.Optional; + +import javax.mail.MessagingException; + +import org.apache.james.mailrepository.api.MailRepositoryStore; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; + +public class ReprocessingOneMailTask implements Task { + + public static final String TYPE = "reprocessingOneTask"; + + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + private final String repositoryUrl; + private final String targetQueue; + private final String mailKey; + private final Optional<String> targetProcessor; + + public AdditionalInformation(String repositoryUrl, String targetQueue, String mailKey, Optional<String> targetProcessor) { + this.repositoryUrl = repositoryUrl; + this.targetQueue = targetQueue; + this.mailKey = mailKey; + this.targetProcessor = targetProcessor; + } + + public String getMailKey() { + return mailKey; + } + + public String getTargetQueue() { + return targetQueue; + } + + public Optional<String> getTargetProcessor() { + return targetProcessor; + } + + public String getRepositoryUrl() { + return repositoryUrl; + } + } + + private final ReprocessingService reprocessingService; + private final String repositoryUrl; + private final String targetQueue; + private final String mailKey; + private final Optional<String> targetProcessor; + private final AdditionalInformation additionalInformation; + + public ReprocessingOneMailTask(ReprocessingService reprocessingService, + String repositoryUrl, String targetQueue, String mailKey, Optional<String> targetProcessor) { + this.reprocessingService = reprocessingService; + this.repositoryUrl = repositoryUrl; + this.targetQueue = targetQueue; + this.mailKey = mailKey; + this.targetProcessor = targetProcessor; + this.additionalInformation = new AdditionalInformation(repositoryUrl, targetQueue, mailKey, targetProcessor); + } + + @Override + public Result run() { + try { + reprocessingService.reprocess(repositoryUrl, mailKey, targetProcessor, targetQueue); + return Result.COMPLETED; + } catch (MessagingException | MailRepositoryStore.MailRepositoryStoreException e) { + LOGGER.error("Encountered error while reprocessing repository", e); + return Result.PARTIAL; + } + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(additionalInformation); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/e6fdeae7/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java new file mode 100644 index 0000000..8eee51b --- /dev/null +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java @@ -0,0 +1,76 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.service; + +import java.util.Optional; +import java.util.function.Consumer; + +import javax.inject.Inject; +import javax.mail.MessagingException; + +import org.apache.james.mailrepository.api.MailRepository; +import org.apache.james.mailrepository.api.MailRepositoryStore; +import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.util.streams.Iterators; +import org.apache.mailet.Mail; + +import com.github.fge.lambdas.Throwing; + +public class ReprocessingService { + + private final MailQueueFactory<?> mailQueueFactory; + private final MailRepositoryStoreService mailRepositoryStoreService; + + @Inject + public ReprocessingService(MailQueueFactory<?> mailQueueFactory, + MailRepositoryStoreService mailRepositoryStoreService) { + this.mailQueueFactory = mailQueueFactory; + this.mailRepositoryStoreService = mailRepositoryStoreService; + } + + public void reprocessAll(String url, Optional<String> targetProcessor, String targetQueue, Consumer<String> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { + MailRepository repository = mailRepositoryStoreService.getRepository(url); + MailQueue mailQueue = getMailQueue(targetQueue); + + Iterators.toStream(repository.list()) + .peek(keyListener) + .forEach(Throwing.consumer(key -> reprocess(repository, mailQueue, key, targetProcessor))); + } + + public void reprocess(String url, String key, Optional<String> targetProcessor, String targetQueue) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { + MailRepository repository = mailRepositoryStoreService.getRepository(url); + MailQueue mailQueue = getMailQueue(targetQueue); + + reprocess(repository, mailQueue, key, targetProcessor); + } + + private void reprocess(MailRepository repository, MailQueue mailQueue, String key, Optional<String> targetProcessor) throws MessagingException { + Mail mail = repository.retrieve(key); + targetProcessor.ifPresent(mail::setState); + mailQueue.enQueue(mail); + repository.remove(key); + } + + private MailQueue getMailQueue(String targetQueue) { + return mailQueueFactory.getQueue(targetQueue) + .orElseThrow(() -> new RuntimeException("Can not find queue " + targetQueue)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
