chibenwa commented on code in PR #1004:
URL: https://github.com/apache/james-project/pull/1004#discussion_r874978669


##########
mailet/standard/src/main/java/org/apache/james/transport/mailets/Expires.java:
##########
@@ -0,0 +1,144 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.io.StringReader;
+import java.time.Duration;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import javax.mail.MessagingException;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.mime4j.dom.datetime.DateTime;
+import org.apache.james.mime4j.field.datetime.parser.DateTimeParser;
+import org.apache.james.mime4j.field.datetime.parser.ParseException;
+import org.apache.james.util.DurationParser;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.DateFormats;
+import org.apache.mailet.base.GenericMailet;
+
+
+/**
+ * <p>Adds an Expires header to the message, or enforces the period of an 
existing one.</p>
+ *
+ * <p>Sample configuration:</p>
+ *
+ * <pre><code>
+ * &lt;mailet match="All" class="Expires"&gt;
+ *   &lt;minAge&gt;1d&lt;/minAge&gt;
+ *   &lt;maxAge&gt;1w&lt;/maxAge&gt;
+ *   &lt;defaultAge&gt;4w&lt;/defaultAge&gt;
+ * &lt;/mailet&gt;
+ * </code></pre>
+ *
+ * @version 1.0.0, 2021-12-14
+ */
+public class Expires extends GenericMailet {
+    
+    public static final String EXPIRES = "Expires";
+    
+    Supplier<ZonedDateTime> timeSource = ZonedDateTime::now; 
+
+    private Optional<Duration> minAge = Optional.empty();
+    private Optional<Duration> maxAge = Optional.empty();
+    private Optional<Duration> defaultAge = Optional.empty();
+    
+    @Override
+    public void init() throws MessagingException {
+        minAge = parseDuration("minAge");
+        maxAge = parseDuration("maxAge");
+        defaultAge = parseDuration("defaultAge");
+
+        if (minAge.isEmpty() && maxAge.isEmpty() && defaultAge.isEmpty()) {
+            throw new MessagingException("Please configure at least one of 
minAge, maxAge, defaultAge");
+        }
+
+        if (isAfter(minAge, maxAge)) {
+            throw new MessagingException("minAge must be before maxAge");
+        }
+        if (isAfter(defaultAge, maxAge)) {
+            throw new MessagingException("defaultAge must be before maxAge");
+        }
+        if (isAfter(minAge, defaultAge)) {
+            throw new MessagingException("minAge must be before defaultAge");
+        }
+    }
+
+    @Override
+    public void service(Mail mail) throws MessagingException {
+        ZonedDateTime now = timeSource.get();
+        MimeMessage message = mail.getMessage();
+        Optional<ZonedDateTime> expires = parseExpiresHeader(message);
+        if (expires.isPresent()) {
+            if (minAge.isPresent() && 
expires.get().isBefore(now.plus(minAge.get()))) {
+                setExpiresAfter(message, now, minAge.get());
+            } else
+            if (maxAge.isPresent() && 
expires.get().isAfter(now.plus(maxAge.get()))) {
+                setExpiresAfter(message, now, maxAge.get());
+            }
+        } else if (defaultAge.isPresent()) {
+            setExpiresAfter(message, now, defaultAge.get());
+        }
+    }
+
+    @Override
+    public String getMailetInfo() {
+        return "Expire Mailet";
+    }
+
+    private Optional<Duration> parseDuration(String param) {
+        String duration = getInitParameter(param);
+        if (duration == null) {
+            return Optional.empty();
+        } else {
+            return Optional.of(DurationParser.parse(duration, 
ChronoUnit.DAYS));
+        }
+    }
+    
+    private boolean isAfter(Optional<Duration> a, Optional<Duration> b) {
+        return a.isPresent() && b.isPresent() && a.get().compareTo(b.get()) > 
0;
+    }
+    
+    private Optional<ZonedDateTime> parseExpiresHeader(MimeMessage message) {
+        try {
+            String[] expires = message.getHeader(EXPIRES);
+            if (expires == null || expires.length == 0) {
+                return Optional.empty();
+            } else {
+                DateTime dt = new DateTimeParser(new 
StringReader(expires[0])).parseAll();
+                return Optional.of(ZonedDateTime.of(
+                    dt.getYear(), dt.getMonth(), dt.getDay(),
+                    dt.getHour(), dt.getMinute(), dt.getSecond(), 0,
+                    ZoneOffset.ofHoursMinutes(dt.getTimeZone() / 100, 
dt.getTimeZone() % 100)));

Review Comment:
   Calendars / dates is a complicated topic that we likely should not 
under-estimate. This is complex and may be subject to subtil edge cases. I 
would be careful. Moreover DateTime entity is untested in MIME4J and not used 
in relevant places.
   
   I would likely refrain from using it.
   
   A more classic approach would be to rely on 
`DateTimeFieldLenientImpl.PARSER` that is easy to use, well tested and moreover 
is polite enough to return a Date that is easy to convert into a ZoneDateTime 
without taking any risk.
   
   I would encourage to migrate this piece of code to use it.



##########
server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireInboxesService.java:
##########
@@ -0,0 +1,209 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailbox.model.SearchQuery.DateResolution;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.DurationParser;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+
+public class ExpireInboxesService {
+
+    public static class RunningOptions {
+        public static final RunningOptions DEFAULT = new RunningOptions(1, 
Optional.empty());
+
+        private final int usersPerSecond;
+
+        private final Optional<String> maxAge;
+
+        @JsonIgnore
+        private final Optional<Duration> maxAgeDuration;
+
+        @JsonCreator
+        public RunningOptions(@JsonProperty("usersPerSecond") int 
usersPerSecond,
+                              @JsonProperty("maxAge") Optional<String> maxAge) 
{
+            Preconditions.checkArgument(usersPerSecond > 0, "'usersPerSecond' 
needs to be strictly positive");
+            this.usersPerSecond = usersPerSecond;
+    
+            this.maxAge = maxAge;
+            this.maxAgeDuration = maxAge.map(v -> {
+                Duration maxAgeDuration = DurationParser.parse(maxAge.get(), 
ChronoUnit.DAYS);
+                Preconditions.checkArgument(!maxAgeDuration.isNegative(), 
"'maxAge' must be positive");
+                return maxAgeDuration;
+            });
+        }
+
+        public int getUsersPerSecond() {
+            return usersPerSecond;
+        }
+
+        public Optional<String> getMaxAge() {
+            return maxAge;
+        }
+    }
+
+    public static class Context {
+        private final AtomicLong inboxesExpired;
+        private final AtomicLong inboxesFailed;
+        private final AtomicLong inboxesProcessed;
+
+        public Context() {
+            this.inboxesExpired = new AtomicLong(0L);
+            this.inboxesFailed = new AtomicLong(0L);
+            this.inboxesProcessed = new AtomicLong(0L);
+        }
+
+        public long getInboxesExpired() {
+            return inboxesExpired.get();
+        }
+
+        public long getInboxesFailed() {
+            return inboxesFailed.get();
+        }
+
+        public long getInboxesProcessed() {
+            return inboxesProcessed.get();
+        }
+
+        public void incrementExpiredCount() {
+            inboxesExpired.incrementAndGet();
+        }
+
+        public void incrementFailedCount() {
+            inboxesFailed.incrementAndGet();
+        }
+
+        public void incrementProcessedCount() {
+            inboxesProcessed.incrementAndGet();
+        }
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExpireInboxesService.class);
+
+    private final UsersRepository usersRepository;
+    private final MailboxManager mailboxManager;
+
+    @Inject
+    public ExpireInboxesService(UsersRepository usersRepository, 
MailboxManager mailboxManager) {
+        this.usersRepository = usersRepository;
+        this.mailboxManager = mailboxManager;
+    }
+
+    public Mono<Result> expireInboxes(Context context, RunningOptions 
runningOptions, Date now) {
+        try {
+            LOGGER.info("expire with maxAge {} = {}", runningOptions.maxAge, 
runningOptions.maxAgeDuration);
+            SearchQuery expiration = SearchQuery.of(
+                runningOptions.maxAgeDuration.map(maxAge -> {
+                        Date limit = Date.from(now.toInstant().minus(maxAge));
+                        return SearchQuery.internalDateBefore(limit, 
DateResolution.Second);
+                    })
+                    .orElse(
+                        SearchQuery.headerDateBefore("Expires", now, 
DateResolution.Second)
+                    )
+            );
+            return Iterators.toFlux(usersRepository.list())
+                .transform(ReactorUtils.<Username, Task.Result>throttle()
+                    .elements(runningOptions.getUsersPerSecond())
+                    .per(Duration.ofSeconds(1))
+                    .forOperation(username -> expireUserInbox(context, 
username, expiration)))

Review Comment:
   Coming from a POP3 world only the INBOX exists but on the outer world there 
are protocols allowing to access a miriad of mailboxes.
   
   I propose to instead use a multi mailbox search query to apply the deletion 
accross all mailboxes of the account.



##########
server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireInboxesService.java:
##########
@@ -0,0 +1,209 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailbox.model.SearchQuery.DateResolution;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.DurationParser;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+
+public class ExpireInboxesService {
+
+    public static class RunningOptions {
+        public static final RunningOptions DEFAULT = new RunningOptions(1, 
Optional.empty());
+
+        private final int usersPerSecond;
+
+        private final Optional<String> maxAge;
+
+        @JsonIgnore
+        private final Optional<Duration> maxAgeDuration;
+
+        @JsonCreator
+        public RunningOptions(@JsonProperty("usersPerSecond") int 
usersPerSecond,
+                              @JsonProperty("maxAge") Optional<String> maxAge) 
{
+            Preconditions.checkArgument(usersPerSecond > 0, "'usersPerSecond' 
needs to be strictly positive");
+            this.usersPerSecond = usersPerSecond;
+    
+            this.maxAge = maxAge;
+            this.maxAgeDuration = maxAge.map(v -> {
+                Duration maxAgeDuration = DurationParser.parse(maxAge.get(), 
ChronoUnit.DAYS);
+                Preconditions.checkArgument(!maxAgeDuration.isNegative(), 
"'maxAge' must be positive");
+                return maxAgeDuration;
+            });
+        }
+
+        public int getUsersPerSecond() {
+            return usersPerSecond;
+        }
+
+        public Optional<String> getMaxAge() {
+            return maxAge;
+        }
+    }
+
+    public static class Context {
+        private final AtomicLong inboxesExpired;
+        private final AtomicLong inboxesFailed;
+        private final AtomicLong inboxesProcessed;
+
+        public Context() {
+            this.inboxesExpired = new AtomicLong(0L);
+            this.inboxesFailed = new AtomicLong(0L);
+            this.inboxesProcessed = new AtomicLong(0L);
+        }
+
+        public long getInboxesExpired() {
+            return inboxesExpired.get();
+        }
+
+        public long getInboxesFailed() {
+            return inboxesFailed.get();
+        }
+
+        public long getInboxesProcessed() {
+            return inboxesProcessed.get();
+        }
+
+        public void incrementExpiredCount() {
+            inboxesExpired.incrementAndGet();
+        }
+
+        public void incrementFailedCount() {
+            inboxesFailed.incrementAndGet();
+        }
+
+        public void incrementProcessedCount() {
+            inboxesProcessed.incrementAndGet();
+        }
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExpireInboxesService.class);
+
+    private final UsersRepository usersRepository;
+    private final MailboxManager mailboxManager;
+
+    @Inject
+    public ExpireInboxesService(UsersRepository usersRepository, 
MailboxManager mailboxManager) {
+        this.usersRepository = usersRepository;
+        this.mailboxManager = mailboxManager;
+    }
+
+    public Mono<Result> expireInboxes(Context context, RunningOptions 
runningOptions, Date now) {
+        try {
+            LOGGER.info("expire with maxAge {} = {}", runningOptions.maxAge, 
runningOptions.maxAgeDuration);
+            SearchQuery expiration = SearchQuery.of(
+                runningOptions.maxAgeDuration.map(maxAge -> {
+                        Date limit = Date.from(now.toInstant().minus(maxAge));
+                        return SearchQuery.internalDateBefore(limit, 
DateResolution.Second);
+                    })
+                    .orElse(
+                        SearchQuery.headerDateBefore("Expires", now, 
DateResolution.Second)

Review Comment:
   Not all search engines implementations supports date search on arbitrary 
headers.
   
   This would require to parse and index such headers as date. Lucene and 
ElasticSearch implems don't do this and treat it as text.
   
   Thus this `Expires` header stuff only works with the scanning search.
   
   This should be at least documented.



##########
server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireInboxesTask.java:
##########
@@ -0,0 +1,124 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.apache.james.webadmin.service.ExpireInboxesService.Context;
+import org.apache.james.webadmin.service.ExpireInboxesService.RunningOptions;
+
+import reactor.core.scheduler.Schedulers;
+
+public class ExpireInboxesTask implements Task {
+    public static final TaskType TASK_TYPE = TaskType.of("ExpireInboxesTask");
+
+    public static class AdditionalInformation implements 
TaskExecutionDetails.AdditionalInformation {
+
+        public static ExpireInboxesTask.AdditionalInformation from(Context 
context,
+                                                                   
RunningOptions runningOptions) {
+            return new ExpireInboxesTask.AdditionalInformation(
+                Clock.systemUTC().instant(),
+                runningOptions,
+                context.getInboxesExpired(),
+                context.getInboxesFailed(),
+                context.getInboxesProcessed());
+        }
+
+        private final Instant timestamp;
+        private final RunningOptions runningOptions;
+        private final long inboxesExpired;
+        private final long inboxesFailed;
+        private final long inboxesProcessed;
+
+        public AdditionalInformation(Instant timestamp,
+                                     RunningOptions runningOptions,
+                                     long inboxesExpired,
+                                     long inboxesFailed,
+                                     long inboxesProcessed) {
+            this.timestamp = timestamp;
+            this.runningOptions = runningOptions;
+            this.inboxesExpired = inboxesExpired;
+            this.inboxesFailed = inboxesFailed;
+            this.inboxesProcessed = inboxesProcessed;
+        }
+
+        public RunningOptions getRunningOptions() {
+            return runningOptions;
+        }
+
+        public long getInboxesExpired() {
+            return inboxesExpired;
+        }
+
+        public long getInboxesFailed() {
+            return inboxesFailed;
+        }
+
+        public long getInboxesProcessed() {
+            return inboxesProcessed;
+        }
+
+        @Override
+        public Instant timestamp() {
+            return timestamp;
+        }
+    }
+
+    private final ExpireInboxesService expireInboxesService;
+    private final Context context;
+    private final RunningOptions runningOptions;
+
+    @Inject
+    public ExpireInboxesTask(ExpireInboxesService expireInboxesService,
+                             RunningOptions runningOptions) {
+        this.expireInboxesService = expireInboxesService;
+        this.context = new Context();
+        this.runningOptions = runningOptions;
+    }
+
+    @Override
+    public Result run() {
+        return expireInboxesService.expireInboxes(context, runningOptions, new 
Date())
+            .subscribeOn(Schedulers.elastic())

Review Comment:
   We already have a non blocking thread to execute this very reactor pipeline 
so why ask an elastic thread to do the job then wait the elastic thread to 
finish?
   
   Removing subscribesOn before .block is a performance oriented refactoring I 
carried over a few weeks ago.



##########
server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireInboxesServiceTest.java:
##########
@@ -0,0 +1,219 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.task.Task;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;

Review Comment:
   Sorry I am uneasy with mocks.
   
   They make refactoring a nightmare. They encode everywhere the inner working 
of the dependencies and breaks encapsulation.
   
   I suggest to rely on InMemoryTestRessource to supply a mailbox manager ready 
to use.



##########
server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireInboxesAdditionalInformationDTO.java:
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.james.webadmin.service.ExpireInboxesService.RunningOptions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ExpireInboxesAdditionalInformationDTO implements 
AdditionalInformationDTO {
+
+    public static 
AdditionalInformationDTOModule<ExpireInboxesTask.AdditionalInformation, 
ExpireInboxesAdditionalInformationDTO> module() {
+        return 
DTOModule.forDomainObject(ExpireInboxesTask.AdditionalInformation.class)
+            .convertToDTO(ExpireInboxesAdditionalInformationDTO.class)
+            
.toDomainObjectConverter(ExpireInboxesAdditionalInformationDTO::toDomainObject)
+            .toDTOConverter(ExpireInboxesAdditionalInformationDTO::toDto)
+            .typeName(ExpireInboxesTask.TASK_TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+    }
+
+    private static ExpireInboxesTask.AdditionalInformation 
toDomainObject(ExpireInboxesAdditionalInformationDTO dto) {
+        return new ExpireInboxesTask.AdditionalInformation(
+            dto.getTimestamp(),
+            dto.getRunningOptions(),
+            dto.getInboxesExpired(),
+            dto.getInboxesFailed(),
+            dto.getInboxesProcessed());
+    }
+
+    private static ExpireInboxesAdditionalInformationDTO 
toDto(ExpireInboxesTask.AdditionalInformation details, String type) {
+        return new ExpireInboxesAdditionalInformationDTO(
+            details.timestamp(),
+            type,
+            details.getRunningOptions(),
+            details.getInboxesExpired(),
+            details.getInboxesFailed(),
+            details.getInboxesProcessed());
+    }
+
+    private final Instant timestamp;
+    private final String type;
+    private final RunningOptions runningOptions;
+    private final long inboxesExpired;
+    private final long inboxesFailed;
+    private final long inboxesProcessed;
+
+    @JsonCreator
+    public ExpireInboxesAdditionalInformationDTO(@JsonProperty("timestamp") 
Instant timestamp,
+                                                 @JsonProperty("type") String 
type,
+                                                 
@JsonProperty("runningOptions") RunningOptions runningOptions,
+                                                 
@JsonProperty("inboxesExpired") long inboxesExpired,
+                                                 
@JsonProperty("inboxesFailed") long inboxesFailed,
+                                                 
@JsonProperty("inboxesProcessed") long inboxesProcessed) {
+        this.timestamp = timestamp;
+        this.type = type;
+        this.runningOptions = runningOptions;
+        this.inboxesExpired = inboxesExpired;
+        this.inboxesFailed = inboxesFailed;
+        this.inboxesProcessed = inboxesProcessed;
+    }
+
+    public RunningOptions getRunningOptions() {
+        return runningOptions;
+    }
+
+    public long getInboxesExpired() {
+        return inboxesExpired;
+    }
+
+    public long getInboxesFailed() {
+        return inboxesFailed;
+    }
+
+    public long getInboxesProcessed() {
+        return inboxesProcessed;
+    }

Review Comment:
   A very interesting metric would be the count of messages that we actually 
deleted...
   
   Maybe possibly also the count of messages we failed to delete.



##########
server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireInboxesService.java:
##########
@@ -0,0 +1,209 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailbox.model.SearchQuery.DateResolution;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.DurationParser;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+
+public class ExpireInboxesService {
+
+    public static class RunningOptions {
+        public static final RunningOptions DEFAULT = new RunningOptions(1, 
Optional.empty());
+
+        private final int usersPerSecond;
+
+        private final Optional<String> maxAge;
+
+        @JsonIgnore
+        private final Optional<Duration> maxAgeDuration;
+
+        @JsonCreator
+        public RunningOptions(@JsonProperty("usersPerSecond") int 
usersPerSecond,
+                              @JsonProperty("maxAge") Optional<String> maxAge) 
{
+            Preconditions.checkArgument(usersPerSecond > 0, "'usersPerSecond' 
needs to be strictly positive");
+            this.usersPerSecond = usersPerSecond;
+    
+            this.maxAge = maxAge;
+            this.maxAgeDuration = maxAge.map(v -> {
+                Duration maxAgeDuration = DurationParser.parse(maxAge.get(), 
ChronoUnit.DAYS);
+                Preconditions.checkArgument(!maxAgeDuration.isNegative(), 
"'maxAge' must be positive");
+                return maxAgeDuration;
+            });
+        }
+
+        public int getUsersPerSecond() {
+            return usersPerSecond;
+        }
+
+        public Optional<String> getMaxAge() {
+            return maxAge;
+        }
+    }
+
+    public static class Context {
+        private final AtomicLong inboxesExpired;
+        private final AtomicLong inboxesFailed;
+        private final AtomicLong inboxesProcessed;
+
+        public Context() {
+            this.inboxesExpired = new AtomicLong(0L);
+            this.inboxesFailed = new AtomicLong(0L);
+            this.inboxesProcessed = new AtomicLong(0L);
+        }
+
+        public long getInboxesExpired() {
+            return inboxesExpired.get();
+        }
+
+        public long getInboxesFailed() {
+            return inboxesFailed.get();
+        }
+
+        public long getInboxesProcessed() {
+            return inboxesProcessed.get();
+        }
+
+        public void incrementExpiredCount() {
+            inboxesExpired.incrementAndGet();
+        }
+
+        public void incrementFailedCount() {
+            inboxesFailed.incrementAndGet();
+        }
+
+        public void incrementProcessedCount() {
+            inboxesProcessed.incrementAndGet();
+        }
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExpireInboxesService.class);
+
+    private final UsersRepository usersRepository;
+    private final MailboxManager mailboxManager;
+
+    @Inject
+    public ExpireInboxesService(UsersRepository usersRepository, 
MailboxManager mailboxManager) {
+        this.usersRepository = usersRepository;
+        this.mailboxManager = mailboxManager;
+    }
+
+    public Mono<Result> expireInboxes(Context context, RunningOptions 
runningOptions, Date now) {
+        try {
+            LOGGER.info("expire with maxAge {} = {}", runningOptions.maxAge, 
runningOptions.maxAgeDuration);
+            SearchQuery expiration = SearchQuery.of(
+                runningOptions.maxAgeDuration.map(maxAge -> {
+                        Date limit = Date.from(now.toInstant().minus(maxAge));
+                        return SearchQuery.internalDateBefore(limit, 
DateResolution.Second);
+                    })
+                    .orElse(
+                        SearchQuery.headerDateBefore("Expires", now, 
DateResolution.Second)
+                    )
+            );
+            return Iterators.toFlux(usersRepository.list())
+                .transform(ReactorUtils.<Username, Task.Result>throttle()
+                    .elements(runningOptions.getUsersPerSecond())
+                    .per(Duration.ofSeconds(1))
+                    .forOperation(username -> expireUserInbox(context, 
username, expiration)))
+                .reduce(Task.Result.COMPLETED, Task::combine);
+        } catch (UsersRepositoryException e) {
+            LOGGER.error("Error while accessing users from repository", e);
+            return Mono.just(Task.Result.PARTIAL);
+        }
+    }
+
+    private Mono<Result> expireUserInbox(Context context, Username username, 
SearchQuery expiration) {
+        MailboxSession session = mailboxManager.createSystemSession(username);
+        MailboxPath mailboxPath = MailboxPath.inbox(username);
+        return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, 
session))
+            // newly created users do not have mailboxes yet, just skip them
+            .onErrorResume(MailboxNotFoundException.class, ignore -> 
Mono.empty())
+            .flatMap(mgr -> searchMessagesReactive(mgr, session, expiration)
+                .flatMap(list -> deleteMessagesReactive(mgr, session, list)))
+            .doOnNext(expired -> {
+                if (expired) {
+                    context.incrementExpiredCount();
+                }
+                context.incrementProcessedCount();
+            })
+            .then(Mono.just(Task.Result.COMPLETED))
+            .onErrorResume(e -> {
+                LOGGER.warn("Failed to expire user mailbox {}", username, e);
+                context.incrementFailedCount();
+                context.incrementProcessedCount();
+                return Mono.just(Task.Result.PARTIAL);
+            });
+    }
+
+    private Mono<List<MessageUid>> searchMessagesReactive(MessageManager mgr, 
MailboxSession session, SearchQuery expiration) {
+        try {
+            return Flux.from(mgr.search(expiration, session)).collectList();
+        } catch (MailboxException e) {
+            return Mono.error(e);
+        }
+    }
+
+    private Mono<Boolean> deleteMessagesReactive(MessageManager mgr, 
MailboxSession session, List<MessageUid> uids) {
+        try {
+            if (uids.isEmpty()) {
+                return Mono.just(false);
+            } else {
+                mgr.delete(uids, session);
+                return Mono.just(true);

Review Comment:
   Warning! A blocking call in a reactive pipeline!
   
   This will cause a major performance collapse.
   
   As a rule of thumbs blocking calls needs to be at the very least wrapped:
   
   ```
   Mono.fromRunnable(() -> blocking()).subscribeOn(elastic());
   ```
   
   (Which is better than nothing but still can spawn dozens of threads blocking 
to wait other reactor tasks to complete... So still wasteful!)
   
   Here we could very easily expose a `deleteReactive` method to not even be 
blocking!



##########
server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireInboxesService.java:
##########
@@ -0,0 +1,209 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailbox.model.SearchQuery.DateResolution;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.DurationParser;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+
+public class ExpireInboxesService {
+
+    public static class RunningOptions {
+        public static final RunningOptions DEFAULT = new RunningOptions(1, 
Optional.empty());
+
+        private final int usersPerSecond;
+
+        private final Optional<String> maxAge;
+
+        @JsonIgnore
+        private final Optional<Duration> maxAgeDuration;
+
+        @JsonCreator
+        public RunningOptions(@JsonProperty("usersPerSecond") int 
usersPerSecond,
+                              @JsonProperty("maxAge") Optional<String> maxAge) 
{
+            Preconditions.checkArgument(usersPerSecond > 0, "'usersPerSecond' 
needs to be strictly positive");
+            this.usersPerSecond = usersPerSecond;
+    
+            this.maxAge = maxAge;
+            this.maxAgeDuration = maxAge.map(v -> {
+                Duration maxAgeDuration = DurationParser.parse(maxAge.get(), 
ChronoUnit.DAYS);
+                Preconditions.checkArgument(!maxAgeDuration.isNegative(), 
"'maxAge' must be positive");
+                return maxAgeDuration;
+            });
+        }
+
+        public int getUsersPerSecond() {
+            return usersPerSecond;
+        }
+
+        public Optional<String> getMaxAge() {
+            return maxAge;
+        }
+    }
+
+    public static class Context {
+        private final AtomicLong inboxesExpired;
+        private final AtomicLong inboxesFailed;
+        private final AtomicLong inboxesProcessed;
+
+        public Context() {
+            this.inboxesExpired = new AtomicLong(0L);
+            this.inboxesFailed = new AtomicLong(0L);
+            this.inboxesProcessed = new AtomicLong(0L);
+        }
+
+        public long getInboxesExpired() {
+            return inboxesExpired.get();
+        }
+
+        public long getInboxesFailed() {
+            return inboxesFailed.get();
+        }
+
+        public long getInboxesProcessed() {
+            return inboxesProcessed.get();
+        }
+
+        public void incrementExpiredCount() {
+            inboxesExpired.incrementAndGet();
+        }
+
+        public void incrementFailedCount() {
+            inboxesFailed.incrementAndGet();
+        }
+
+        public void incrementProcessedCount() {
+            inboxesProcessed.incrementAndGet();
+        }
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExpireInboxesService.class);
+
+    private final UsersRepository usersRepository;
+    private final MailboxManager mailboxManager;
+
+    @Inject
+    public ExpireInboxesService(UsersRepository usersRepository, 
MailboxManager mailboxManager) {
+        this.usersRepository = usersRepository;
+        this.mailboxManager = mailboxManager;
+    }
+
+    public Mono<Result> expireInboxes(Context context, RunningOptions 
runningOptions, Date now) {
+        try {
+            LOGGER.info("expire with maxAge {} = {}", runningOptions.maxAge, 
runningOptions.maxAgeDuration);
+            SearchQuery expiration = SearchQuery.of(
+                runningOptions.maxAgeDuration.map(maxAge -> {
+                        Date limit = Date.from(now.toInstant().minus(maxAge));
+                        return SearchQuery.internalDateBefore(limit, 
DateResolution.Second);
+                    })
+                    .orElse(
+                        SearchQuery.headerDateBefore("Expires", now, 
DateResolution.Second)
+                    )
+            );
+            return Iterators.toFlux(usersRepository.list())
+                .transform(ReactorUtils.<Username, Task.Result>throttle()
+                    .elements(runningOptions.getUsersPerSecond())
+                    .per(Duration.ofSeconds(1))
+                    .forOperation(username -> expireUserInbox(context, 
username, expiration)))
+                .reduce(Task.Result.COMPLETED, Task::combine);
+        } catch (UsersRepositoryException e) {
+            LOGGER.error("Error while accessing users from repository", e);
+            return Mono.just(Task.Result.PARTIAL);
+        }
+    }
+
+    private Mono<Result> expireUserInbox(Context context, Username username, 
SearchQuery expiration) {
+        MailboxSession session = mailboxManager.createSystemSession(username);
+        MailboxPath mailboxPath = MailboxPath.inbox(username);
+        return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, 
session))
+            // newly created users do not have mailboxes yet, just skip them
+            .onErrorResume(MailboxNotFoundException.class, ignore -> 
Mono.empty())
+            .flatMap(mgr -> searchMessagesReactive(mgr, session, expiration)
+                .flatMap(list -> deleteMessagesReactive(mgr, session, list)))
+            .doOnNext(expired -> {
+                if (expired) {
+                    context.incrementExpiredCount();
+                }
+                context.incrementProcessedCount();
+            })
+            .then(Mono.just(Task.Result.COMPLETED))
+            .onErrorResume(e -> {
+                LOGGER.warn("Failed to expire user mailbox {}", username, e);
+                context.incrementFailedCount();
+                context.incrementProcessedCount();
+                return Mono.just(Task.Result.PARTIAL);
+            });
+    }
+
+    private Mono<List<MessageUid>> searchMessagesReactive(MessageManager mgr, 
MailboxSession session, SearchQuery expiration) {
+        try {
+            return Flux.from(mgr.search(expiration, session)).collectList();

Review Comment:
   Why do we need to collectList ?
   
   This seems like an uneeded costly overhead. Why not keep the Flux?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to