This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 98f7c13d5342ab3050f9a4ed88aba90a81b6521f Author: Benoit Tellier <[email protected]> AuthorDate: Tue Nov 17 17:03:37 2020 +0700 JAMES-3440 Utility to populate EmailQueryView --- .../data/jmap/EmailQueryViewPopulator.java | 209 +++++++++++++++++++++ 1 file changed, 209 insertions(+) diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java new file mode 100644 index 0000000..2141fbd --- /dev/null +++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java @@ -0,0 +1,209 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.data.jmap; + +import static org.apache.james.mailbox.MailboxManager.MailboxSearchFetchType.Minimal; + +import java.io.IOException; +import java.time.Duration; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import javax.inject.Inject; + +import org.apache.james.jmap.api.projections.EmailQueryView; +import org.apache.james.mailbox.MailboxManager; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageManager; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.FetchGroup; +import org.apache.james.mailbox.model.MailboxId; +import org.apache.james.mailbox.model.MailboxMetaData; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.model.MessageResult; +import org.apache.james.mailbox.model.search.MailboxQuery; +import org.apache.james.mime4j.dom.Message; +import org.apache.james.mime4j.stream.MimeConfig; +import org.apache.james.task.Task; +import org.apache.james.task.Task.Result; +import org.apache.james.user.api.UsersRepository; +import org.apache.james.user.api.UsersRepositoryException; +import org.apache.james.util.ReactorUtils; +import org.apache.james.util.streams.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class EmailQueryViewPopulator { + private static final Logger LOGGER = LoggerFactory.getLogger(EmailQueryViewPopulator.class); + private static final Duration PERIOD = Duration.ofSeconds(1); + public static final int USER_CONCURRENCY = 1; + public static final int MAILBOX_CONCURRENCY = 1; + + static class Progress { + private final AtomicLong processedUserCount; + private final AtomicLong processedMessageCount; + private final AtomicLong failedUserCount; + private final AtomicLong failedMessageCount; + + Progress() { + failedUserCount = new AtomicLong(); + processedMessageCount = new AtomicLong(); + processedUserCount = new AtomicLong(); + failedMessageCount = new AtomicLong(); + } + + private void incrementProcessedUserCount() { + processedUserCount.incrementAndGet(); + } + + private void incrementProcessedMessageCount() { + processedMessageCount.incrementAndGet(); + } + + private void incrementFailedUserCount() { + failedUserCount.incrementAndGet(); + } + + private void incrementFailedMessageCount() { + failedMessageCount.incrementAndGet(); + } + + long getProcessedUserCount() { + return processedUserCount.get(); + } + + long getProcessedMessageCount() { + return processedMessageCount.get(); + } + + long getFailedUserCount() { + return failedUserCount.get(); + } + + long getFailedMessageCount() { + return failedMessageCount.get(); + } + } + + private final UsersRepository usersRepository; + private final MailboxManager mailboxManager; + private final EmailQueryView emailQueryView; + + @Inject + EmailQueryViewPopulator(UsersRepository usersRepository, + MailboxManager mailboxManager, + EmailQueryView emailQueryView) { + this.usersRepository = usersRepository; + this.mailboxManager = mailboxManager; + this.emailQueryView = emailQueryView; + } + + Mono<Result> populateView(Progress progress, RunningOptions runningOptions) { + return correctProjection(listAllMailboxMessages(progress), runningOptions, progress); + } + + private Flux<MessageResult> listAllMailboxMessages(Progress progress) { + try { + return Iterators.toFlux(usersRepository.list()) + .map(mailboxManager::createSystemSession) + .doOnNext(any -> progress.incrementProcessedUserCount()) + .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY); + } catch (UsersRepositoryException e) { + return Flux.error(e); + } + } + + private Flux<MessageResult> listUserMailboxMessages(Progress progress, MailboxSession session) { + return listUsersMailboxes(session) + .flatMap(mailboxMetadata -> retrieveMailbox(session, mailboxMetadata), MAILBOX_CONCURRENCY) + .flatMap(Throwing.function(messageManager -> listAllMessages(messageManager, session)), MAILBOX_CONCURRENCY) + .onErrorResume(MailboxException.class, e -> { + LOGGER.error("JMAP emailQuery view re-computation aborted for {} as we failed listing user mailboxes", session.getUser(), e); + progress.incrementFailedUserCount(); + return Flux.empty(); + }); + } + + private Mono<Result> correctProjection(MessageResult messageResult, Progress progress) { + return Mono.fromCallable(() -> { + MailboxId mailboxId = messageResult.getMailboxId(); + MessageId messageId = messageResult.getMessageId(); + ZonedDateTime receivedAt = ZonedDateTime.ofInstant(messageResult.getInternalDate().toInstant(), ZoneOffset.UTC); + Message mime4JMessage = parseMessage(messageResult); + Date sentAtDate = Optional.ofNullable(mime4JMessage.getDate()).orElse(messageResult.getInternalDate()); + ZonedDateTime sentAt = ZonedDateTime.ofInstant(sentAtDate.toInstant(), ZoneOffset.UTC); + + return new EmailQueryView.Entry(mailboxId, messageId, sentAt, receivedAt); + }) + .flatMap(entry -> emailQueryView.save(entry.getMailboxId(), entry.getSentAt(), entry.getReceivedAt(), entry.getMessageId())) + .thenReturn(Result.COMPLETED) + .doOnSuccess(any -> progress.incrementProcessedMessageCount()) + .onErrorResume(e -> { + LOGGER.error("JMAP emailQuery view re-computation aborted for {} - {} - {}", + messageResult.getMailboxId(), + messageResult.getMessageId(), + messageResult.getUid(), e); + progress.incrementFailedMessageCount(); + return Mono.just(Result.PARTIAL); + }); + } + + private Mono<Result> correctProjection(Flux<MessageResult> entries, RunningOptions runningOptions, Progress progress) { + return entries.transform(ReactorUtils.<MessageResult, Result>throttle() + .elements(runningOptions.getMessagesPerSecond()) + .per(PERIOD) + .forOperation(entry -> correctProjection(entry, progress))) + .reduce(Task::combine) + .switchIfEmpty(Mono.just(Result.COMPLETED)); + } + + private Flux<MailboxMetaData> listUsersMailboxes(MailboxSession session) { + return mailboxManager.search(MailboxQuery.privateMailboxesBuilder(session).build(), Minimal, session); + } + + private Mono<MessageManager> retrieveMailbox(MailboxSession session, MailboxMetaData mailboxMetadata) { + return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session)); + } + + private Flux<MessageResult> listAllMessages(MessageManager messageManager, MailboxSession session) { + try { + return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.HEADERS, session)); + } catch (MailboxException e) { + return Flux.error(e); + } + } + + private Message parseMessage(MessageResult messageResult) throws IOException, MailboxException { + return Message.Builder + .of() + .use(MimeConfig.PERMISSIVE) + .parse(messageResult.getFullContent().getInputStream()) + .build(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
