JAMES-2555 Wrap reindexing into a task
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7c4f4d86 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7c4f4d86 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7c4f4d86 Branch: refs/heads/master Commit: 7c4f4d86367d3fc61fcf4854280137551bd6d14b Parents: 884b88f Author: Benoit Tellier <[email protected]> Authored: Thu Oct 11 16:22:57 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Mon Oct 15 13:17:46 2018 +0700 ---------------------------------------------------------------------- mailbox/api/pom.xml | 4 + .../apache/james/mailbox/indexer/ReIndexer.java | 5 +- .../tools/indexer/FullReindexingTask.java | 79 +++++++++ .../mailbox/tools/indexer/ReIndexerImpl.java | 131 +-------------- .../tools/indexer/ReIndexerPerformer.java | 168 +++++++++++++++++++ .../tools/indexer/ReprocessingContext.java | 53 ++++++ .../indexer/SingleMailboxReindexingTask.java | 88 ++++++++++ .../mailbox/tools/indexer/ThrowsReIndexer.java | 5 +- .../indexer/CassandraReIndexerImplTest.java | 4 +- .../tools/indexer/ReIndexerImplTest.java | 6 +- .../cli/ReindexCommandIntegrationTest.java | 7 +- .../adapter/mailbox/ReIndexerManagement.java | 11 +- .../META-INF/org/apache/james/spring-server.xml | 3 + 13 files changed, 428 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/api/pom.xml ---------------------------------------------------------------------- diff --git a/mailbox/api/pom.xml b/mailbox/api/pom.xml index 18578c1..56237b3 100644 --- a/mailbox/api/pom.xml +++ b/mailbox/api/pom.xml @@ -38,6 +38,10 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-task</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-util</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java ---------------------------------------------------------------------- diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java index 302e01c..94304a7 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java @@ -21,11 +21,12 @@ package org.apache.james.mailbox.indexer; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.task.Task; public interface ReIndexer { - void reIndex(MailboxPath path) throws MailboxException; + Task reIndex(MailboxPath path) throws MailboxException; - void reIndex() throws MailboxException; + Task reIndex() throws MailboxException; } http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java new file mode 100644 index 0000000..196c1d4 --- /dev/null +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java @@ -0,0 +1,79 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.mailbox.tools.indexer; + +import java.util.Optional; + +import javax.inject.Inject; + +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; + +public class FullReindexingTask implements Task { + + public static final String FULL_RE_INDEXING = "FullReIndexing"; + + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + private final ReprocessingContext reprocessingContext; + + AdditionalInformation(ReprocessingContext reprocessingContext) { + this.reprocessingContext = reprocessingContext; + } + + public int getSuccessfullyReprocessMailCount() { + return reprocessingContext.successfullyReprocessedMailCount(); + } + + public int getFailedReprocessedMailCount() { + return reprocessingContext.failedReprocessingMailCount(); + } + } + + private final ReIndexerPerformer reIndexerPerformer; + private final AdditionalInformation additionalInformation; + private final ReprocessingContext reprocessingContext; + + @Inject + public FullReindexingTask(ReIndexerPerformer reIndexerPerformer) { + this.reIndexerPerformer = reIndexerPerformer; + this.reprocessingContext = new ReprocessingContext(); + this.additionalInformation = new AdditionalInformation(reprocessingContext); + } + + @Override + public Result run() { + try { + return reIndexerPerformer.reIndex(reprocessingContext); + } catch (MailboxException e) { + return Result.PARTIAL; + } + } + + @Override + public String type() { + return FULL_RE_INDEXING; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(additionalInformation); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java index f239e32..415aacc 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java @@ -19,33 +19,11 @@ package org.apache.mailbox.tools.indexer; -import java.util.List; -import java.util.Optional; - import javax.inject.Inject; -import org.apache.james.mailbox.MailboxManager; -import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.MessageUid; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.indexer.ReIndexer; import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.model.MessageRange; -import org.apache.james.mailbox.store.MailboxSessionMapperFactory; -import org.apache.james.mailbox.store.mail.MessageMapper; -import org.apache.james.mailbox.store.mail.model.Mailbox; -import org.apache.james.mailbox.store.mail.model.MailboxMessage; -import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; -import org.apache.james.util.streams.Iterators; -import org.apache.mailbox.tools.indexer.events.ImpactingEventType; -import org.apache.mailbox.tools.indexer.events.ImpactingMessageEvent; -import org.apache.mailbox.tools.indexer.registrations.GlobalRegistration; -import org.apache.mailbox.tools.indexer.registrations.MailboxRegistration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.fge.lambdas.Throwing; -import com.google.common.collect.Lists; +import org.apache.james.task.Task; /** * Note about live re-indexation handling : @@ -61,114 +39,21 @@ import com.google.common.collect.Lists; */ public class ReIndexerImpl implements ReIndexer { - private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerImpl.class); - - private static final int NO_LIMIT = 0; - private static final int SINGLE_MESSAGE = 1; - - private final MailboxManager mailboxManager; - private final ListeningMessageSearchIndex messageSearchIndex; - private final MailboxSessionMapperFactory mailboxSessionMapperFactory; + private final ReIndexerPerformer reIndexerPerformer; @Inject - public ReIndexerImpl(MailboxManager mailboxManager, - ListeningMessageSearchIndex messageSearchIndex, - MailboxSessionMapperFactory mailboxSessionMapperFactory) { - this.mailboxManager = mailboxManager; - this.messageSearchIndex = messageSearchIndex; - this.mailboxSessionMapperFactory = mailboxSessionMapperFactory; + public ReIndexerImpl(ReIndexerPerformer reIndexerPerformer) { + this.reIndexerPerformer = reIndexerPerformer; } @Override - public void reIndex(MailboxPath path) throws MailboxException { - MailboxSession mailboxSession = mailboxManager.createSystemSession(path.getUser()); - reIndex(path, mailboxSession); + public Task reIndex(MailboxPath path) { + return new SingleMailboxReindexingTask(reIndexerPerformer, path); } - @Override - public void reIndex() throws MailboxException { - MailboxSession mailboxSession = mailboxManager.createSystemSession("re-indexing"); - LOGGER.info("Starting a full reindex"); - List<MailboxPath> mailboxPaths = mailboxManager.list(mailboxSession); - GlobalRegistration globalRegistration = new GlobalRegistration(); - mailboxManager.addGlobalListener(globalRegistration, mailboxSession); - try { - handleFullReindexingIterations(mailboxPaths, globalRegistration); - } finally { - mailboxManager.removeGlobalListener(globalRegistration, mailboxSession); - } - LOGGER.info("Full reindex finished"); - } - - private void handleFullReindexingIterations(List<MailboxPath> mailboxPaths, GlobalRegistration globalRegistration) { - for (MailboxPath mailboxPath : mailboxPaths) { - Optional<MailboxPath> pathToIndex = globalRegistration.getPathToIndex(mailboxPath); - if (pathToIndex.isPresent()) { - try { - reIndex(pathToIndex.get()); - } catch (Throwable e) { - LOGGER.error("Error while proceeding to full reindexing on {}", pathToIndex.get(), e); - } - } - } - } - - - private void reIndex(MailboxPath path, MailboxSession mailboxSession) throws MailboxException { - MailboxRegistration mailboxRegistration = new MailboxRegistration(path); - LOGGER.info("Intend to reindex {}",path); - Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path); - messageSearchIndex.deleteAll(mailboxSession, mailbox); - mailboxManager.addListener(path, mailboxRegistration, mailboxSession); - try { - Iterators.toStream( - mailboxSessionMapperFactory.getMessageMapper(mailboxSession) - .findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, NO_LIMIT)) - .map(MailboxMessage::getUid) - .forEach(uid -> handleMessageReIndexing(mailboxSession, mailboxRegistration, mailbox, uid)); - LOGGER.info("Finish to reindex {}", path); - } finally { - mailboxManager.removeListener(path, mailboxRegistration, mailboxSession); - } - } - - private void handleMessageReIndexing(MailboxSession mailboxSession, MailboxRegistration mailboxRegistration, Mailbox mailbox, MessageUid uid) { - try { - Optional<ImpactingMessageEvent> impactingMessageEvent = findMostRelevant(mailboxRegistration.getImpactingEvents(uid)); - - Optional.of(uid) - .filter(x -> !wasDeleted(impactingMessageEvent)) - .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid))) - .map(message -> messageUpdateRegardingEvents(message, impactingMessageEvent)) - .ifPresent(Throwing.consumer(message -> messageSearchIndex.add(mailboxSession, mailbox, message))); - } catch (Exception e) { - LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e); - } - } - - private Optional<ImpactingMessageEvent> findMostRelevant(List<ImpactingMessageEvent> messageEvents) { - for (ImpactingMessageEvent impactingMessageEvent : messageEvents) { - if (impactingMessageEvent.getType().equals(ImpactingEventType.Deletion)) { - return Optional.of(impactingMessageEvent); - } - } - return Lists.reverse(messageEvents).stream().findFirst(); - } - - private boolean wasDeleted(Optional<ImpactingMessageEvent> impactingMessageEvent) { - return impactingMessageEvent.map(ImpactingMessageEvent::wasDeleted).orElse(false); - } - - private MailboxMessage messageUpdateRegardingEvents(MailboxMessage message, Optional<ImpactingMessageEvent> impactingMessageEvent) { - impactingMessageEvent.flatMap(ImpactingMessageEvent::newFlags).ifPresent(message::setFlags); - return message; - } - - private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) throws MailboxException { - return Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) - .findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)) - .findFirst(); + public Task reIndex() { + return new FullReindexingTask(reIndexerPerformer); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java new file mode 100644 index 0000000..e70b88d --- /dev/null +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -0,0 +1,168 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.mailbox.tools.indexer; + +import java.util.List; +import java.util.Optional; + +import javax.inject.Inject; + +import org.apache.james.mailbox.MailboxManager; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.store.MailboxSessionMapperFactory; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.Mailbox; +import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; +import org.apache.james.task.Task; +import org.apache.james.util.OptionalUtils; +import org.apache.james.util.streams.Iterators; +import org.apache.mailbox.tools.indexer.events.ImpactingEventType; +import org.apache.mailbox.tools.indexer.events.ImpactingMessageEvent; +import org.apache.mailbox.tools.indexer.registrations.GlobalRegistration; +import org.apache.mailbox.tools.indexer.registrations.MailboxRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; +import com.google.common.collect.Lists; + +public class ReIndexerPerformer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class); + + private static final int NO_LIMIT = 0; + private static final int SINGLE_MESSAGE = 1; + + private final MailboxManager mailboxManager; + private final ListeningMessageSearchIndex messageSearchIndex; + private final MailboxSessionMapperFactory mailboxSessionMapperFactory; + + @Inject + public ReIndexerPerformer(MailboxManager mailboxManager, + ListeningMessageSearchIndex messageSearchIndex, + MailboxSessionMapperFactory mailboxSessionMapperFactory) { + this.mailboxManager = mailboxManager; + this.messageSearchIndex = messageSearchIndex; + this.mailboxSessionMapperFactory = mailboxSessionMapperFactory; + } + + public Task.Result reIndex(MailboxPath path, ReprocessingContext reprocessingContext) throws MailboxException { + MailboxSession mailboxSession = mailboxManager.createSystemSession(path.getUser()); + return reIndex(path, mailboxSession, reprocessingContext); + } + + public Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException { + MailboxSession mailboxSession = mailboxManager.createSystemSession("re-indexing"); + LOGGER.info("Starting a full reindex"); + List<MailboxPath> mailboxPaths = mailboxManager.list(mailboxSession); + GlobalRegistration globalRegistration = new GlobalRegistration(); + mailboxManager.addGlobalListener(globalRegistration, mailboxSession); + try { + return handleFullReindexingIterations(mailboxPaths, globalRegistration, reprocessingContext); + } finally { + mailboxManager.removeGlobalListener(globalRegistration, mailboxSession); + LOGGER.info("Full reindex finished"); + } + } + + private Task.Result handleFullReindexingIterations(List<MailboxPath> mailboxPaths, GlobalRegistration globalRegistration, + ReprocessingContext reprocessingContext) { + return mailboxPaths.stream() + .map(globalRegistration::getPathToIndex) + .flatMap(OptionalUtils::toStream) + .map(path -> { + try { + return reIndex(path, reprocessingContext); + } catch (Throwable e) { + LOGGER.error("Error while proceeding to full reindexing on {}", path.asString(), e); + return Task.Result.PARTIAL; + } + }) + .reduce(Task::combine) + .orElse(Task.Result.COMPLETED); + } + + private Task.Result reIndex(MailboxPath path, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) throws MailboxException { + MailboxRegistration mailboxRegistration = new MailboxRegistration(path); + LOGGER.info("Intend to reindex {}", path.asString()); + Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path); + messageSearchIndex.deleteAll(mailboxSession, mailbox); + mailboxManager.addListener(path, mailboxRegistration, mailboxSession); + try { + return Iterators.toStream( + mailboxSessionMapperFactory.getMessageMapper(mailboxSession) + .findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, NO_LIMIT)) + .map(MailboxMessage::getUid) + .map(uid -> handleMessageReIndexing(mailboxSession, mailboxRegistration, mailbox, uid)) + .peek(reprocessingContext::updateAccordingToReprocessingResult) + .reduce(Task::combine) + .orElse(Task.Result.COMPLETED); + } finally { + LOGGER.info("Finish to reindex {}", path.asString()); + mailboxManager.removeListener(path, mailboxRegistration, mailboxSession); + } + } + + private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, MailboxRegistration mailboxRegistration, Mailbox mailbox, MessageUid uid) { + try { + Optional<ImpactingMessageEvent> impactingMessageEvent = findMostRelevant(mailboxRegistration.getImpactingEvents(uid)); + + Optional.of(uid) + .filter(x -> !wasDeleted(impactingMessageEvent)) + .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid))) + .map(message -> messageUpdateRegardingEvents(message, impactingMessageEvent)) + .ifPresent(Throwing.consumer(message -> messageSearchIndex.add(mailboxSession, mailbox, message))); + return Task.Result.COMPLETED; + } catch (Exception e) { + LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e); + return Task.Result.PARTIAL; + } + } + + private Optional<ImpactingMessageEvent> findMostRelevant(List<ImpactingMessageEvent> messageEvents) { + for (ImpactingMessageEvent impactingMessageEvent : messageEvents) { + if (impactingMessageEvent.getType().equals(ImpactingEventType.Deletion)) { + return Optional.of(impactingMessageEvent); + } + } + return Lists.reverse(messageEvents).stream().findFirst(); + } + + private boolean wasDeleted(Optional<ImpactingMessageEvent> impactingMessageEvent) { + return impactingMessageEvent.map(ImpactingMessageEvent::wasDeleted).orElse(false); + } + + private MailboxMessage messageUpdateRegardingEvents(MailboxMessage message, Optional<ImpactingMessageEvent> impactingMessageEvent) { + impactingMessageEvent.flatMap(ImpactingMessageEvent::newFlags).ifPresent(message::setFlags); + return message; + } + + private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) throws MailboxException { + return Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) + .findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)) + .findFirst(); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java new file mode 100644 index 0000000..08417cf --- /dev/null +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java @@ -0,0 +1,53 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.mailbox.tools.indexer; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.james.task.Task; + +public class ReprocessingContext { + private final AtomicInteger successfullyReprocessedMails; + private final AtomicInteger failedReprocessingMails; + + public ReprocessingContext() { + failedReprocessingMails = new AtomicInteger(0); + successfullyReprocessedMails = new AtomicInteger(0); + } + + public void updateAccordingToReprocessingResult(Task.Result result) { + switch (result) { + case COMPLETED: + successfullyReprocessedMails.incrementAndGet(); + break; + case PARTIAL: + failedReprocessingMails.incrementAndGet(); + break; + } + } + + public int successfullyReprocessedMailCount() { + return successfullyReprocessedMails.get(); + } + + public int failedReprocessingMailCount() { + return failedReprocessingMails.get(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java new file mode 100644 index 0000000..fc9f834 --- /dev/null +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java @@ -0,0 +1,88 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.mailbox.tools.indexer; + +import java.util.Optional; + +import javax.inject.Inject; + +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; + +public class SingleMailboxReindexingTask implements Task { + + public static final String MAILBOX_RE_INDEXING = "mailboxReIndexing"; + + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + private final MailboxPath mailboxPath; + private final ReprocessingContext reprocessingContext; + + AdditionalInformation(MailboxPath mailboxPath, ReprocessingContext reprocessingContext) { + this.mailboxPath = mailboxPath; + this.reprocessingContext = reprocessingContext; + } + + public String getMailboxPath() { + return mailboxPath.asString(); + } + + public int getSuccessfullyReprocessMailCount() { + return reprocessingContext.successfullyReprocessedMailCount(); + } + + public int getFailedReprocessedMailCount() { + return reprocessingContext.failedReprocessingMailCount(); + } + } + + private final ReIndexerPerformer reIndexerPerformer; + private final MailboxPath path; + private final AdditionalInformation additionalInformation; + private final ReprocessingContext reprocessingContext; + + @Inject + public SingleMailboxReindexingTask(ReIndexerPerformer reIndexerPerformer, MailboxPath path) { + this.reIndexerPerformer = reIndexerPerformer; + this.path = path; + this.reprocessingContext = new ReprocessingContext(); + this.additionalInformation = new AdditionalInformation(path, reprocessingContext); + } + + @Override + public Result run() { + try { + return reIndexerPerformer.reIndex(path, reprocessingContext); + } catch (MailboxException e) { + return Result.PARTIAL; + } + } + + @Override + public String type() { + return MAILBOX_RE_INDEXING; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(additionalInformation); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java index 462537b..6734795 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java @@ -22,16 +22,17 @@ package org.apache.mailbox.tools.indexer; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.indexer.ReIndexer; import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.task.Task; public class ThrowsReIndexer implements ReIndexer { @Override - public void reIndex(MailboxPath path) throws MailboxException { + public Task reIndex(MailboxPath path) throws MailboxException { throw new MailboxException("Not implemented"); } @Override - public void reIndex() throws MailboxException { + public Task reIndex() throws MailboxException { throw new MailboxException("Not implemented"); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java index 26f68cc..6c55478 100644 --- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java +++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java @@ -64,7 +64,7 @@ public class CassandraReIndexerImplTest { mailboxManager = CassandraMailboxManagerProvider.provideMailboxManager(cassandra.getConf(), cassandra.getTypesProvider()); MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory(); messageSearchIndex = mock(ListeningMessageSearchIndex.class); - reIndexer = new ReIndexerImpl(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory); + reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory)); } @Test @@ -88,7 +88,7 @@ public class CassandraReIndexerImplTest { .runSuccessfullyWithin(Duration.ofMinutes(10)); // When We re-index - reIndexer.reIndex(INBOX); + reIndexer.reIndex(INBOX).run(); // The indexer is called for each message verify(messageSearchIndex).deleteAll(any(MailboxSession.class), any(Mailbox.class)); http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java index 76f8a00..003601e 100644 --- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java +++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java @@ -57,7 +57,7 @@ public class ReIndexerImplTest { mailboxManager = new InMemoryIntegrationResources().createMailboxManager(new SimpleGroupMembershipResolver()); MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory(); messageSearchIndex = mock(ListeningMessageSearchIndex.class); - reIndexer = new ReIndexerImpl(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory); + reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory)); } @Test @@ -69,7 +69,7 @@ public class ReIndexerImplTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession); - reIndexer.reIndex(INBOX); + reIndexer.reIndex(INBOX).run(); ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class); ArgumentCaptor<Mailbox> mailboxCaptor1 = ArgumentCaptor.forClass(Mailbox.class); @@ -94,7 +94,7 @@ public class ReIndexerImplTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession); - reIndexer.reIndex(); + reIndexer.reIndex().run(); ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class); ArgumentCaptor<Mailbox> mailboxCaptor1 = ArgumentCaptor.forClass(Mailbox.class); ArgumentCaptor<Mailbox> mailboxCaptor2 = ArgumentCaptor.forClass(Mailbox.class); http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java ---------------------------------------------------------------------- diff --git a/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java b/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java index bccaa62..6d81781 100644 --- a/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java +++ b/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java @@ -19,8 +19,10 @@ package org.apache.james.cli; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.james.GuiceJamesServer; import org.apache.james.MemoryJmapTestRule; @@ -29,6 +31,7 @@ import org.apache.james.mailbox.model.MailboxConstants; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; import org.apache.james.modules.server.JMXServerModule; +import org.apache.james.task.Task; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -37,7 +40,7 @@ import org.junit.Test; import com.google.inject.name.Names; public class ReindexCommandIntegrationTest { - public static final String USER = "user"; + private static final String USER = "user"; private ReIndexer reIndexer; @Rule @@ -47,6 +50,8 @@ public class ReindexCommandIntegrationTest { @Before public void setUp() throws Exception { reIndexer = mock(ReIndexer.class); + when(reIndexer.reIndex()).thenReturn(() -> Task.Result.COMPLETED); + when(reIndexer.reIndex(any(MailboxPath.class))).thenReturn(() -> Task.Result.COMPLETED); guiceJamesServer = memoryJmap.jmapServer(new JMXServerModule(), binder -> binder.bind(ListeningMessageSearchIndex.class).toInstance(mock(ListeningMessageSearchIndex.class))) .overrideWith(binder -> binder.bind(ReIndexer.class) http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java ---------------------------------------------------------------------- diff --git a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java index c5110a8..760b3d9 100644 --- a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java +++ b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java @@ -28,14 +28,18 @@ import javax.inject.Named; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.indexer.ReIndexer; import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.task.TaskId; +import org.apache.james.task.TaskManager; import org.apache.james.util.MDCBuilder; public class ReIndexerManagement implements ReIndexerManagementMBean { - private ReIndexer reIndexer; + private final TaskManager taskManager; + private final ReIndexer reIndexer; @Inject - public void setReIndexer(@Named("reindexer") ReIndexer reIndexer) { + public ReIndexerManagement(TaskManager taskManager, @Named("reindexer") ReIndexer reIndexer) { + this.taskManager = taskManager; this.reIndexer = reIndexer; } @@ -46,7 +50,8 @@ public class ReIndexerManagement implements ReIndexerManagementMBean { .addContext(MDCBuilder.PROTOCOL, "CLI") .addContext(MDCBuilder.ACTION, "reIndex") .build()) { - reIndexer.reIndex(new MailboxPath(namespace, user, name)); + TaskId taskId = taskManager.submit(reIndexer.reIndex(new MailboxPath(namespace, user, name))); + taskManager.await(taskId); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml ---------------------------------------------------------------------- diff --git a/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml b/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml index fa12ef6..aab5c34 100644 --- a/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml +++ b/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml @@ -287,6 +287,9 @@ <bean id="quotamanagermanagement" class="org.apache.james.adapter.mailbox.QuotaManagement"/> <bean id="reindexermanagement" class="org.apache.james.adapter.mailbox.ReIndexerManagement"/> <bean id="sievemanagerbean" class="org.apache.james.sieverepository.lib.SieveRepositoryManagement"/> + + + <bean id="taskManager" class="org.apache.james.task.MemoryTaskManager"/> <!-- <bean id="james23importermanagement" class="org.apache.james.container.spring.tool.James23ImporterManagement" /> --> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
