This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.8.x by this push:
     new 7bd5b0e65f JAMES-4041 Fix OOM upon IMAP COPY
7bd5b0e65f is described below

commit 7bd5b0e65f95eb9441f178957bd9940bdbcf2c97
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu May 30 09:45:47 2024 +0200

    JAMES-4041 Fix OOM upon IMAP COPY
    
    Issue arised from command lines like:
    
    ```
    a0 COPY 1:4294967295 otherMailbox
    ```
    
    Semantically valid but it actually caused the batching algorithm to generate
    with a batch size of 10 429.496.730 ranges. With a memory overhead of ~52 
bytes
    per range we would occupy a space of ~20GB thus the OOM.
    
    The root cause lies on the MUA using an arbitrary large range instead of:
    
    ```
    a0 COPY 1:* otherMailbox
    ```
    
    We believe the batching semantic also is incorrect. Browsing large amounts 
of
    metadata actuall is not an issue, but the post traitment might be.
    
    We thus:
     - First consolidate the range against the actual messages in the store
     - Window it using the batch size.
     - And finally execute like before the copy/move logic.
    
    This avoids generating a large amount of potentially enpty ranges.
    
    MessageBatcher is furthermore removed as it no longer is used.
---
 .../james/mailbox/store/JVMMailboxPathLocker.java  |  5 +-
 .../mailbox/store/MailboxManagerConfiguration.java | 16 -----
 .../apache/james/mailbox/store/MessageBatcher.java | 77 ---------------------
 .../james/mailbox/store/StoreMailboxManager.java   | 24 +++----
 .../james/mailbox/store/StoreMessageManager.java   | 70 ++++++++++---------
 .../james/mailbox/store/MessageBatcherTest.java    | 78 ----------------------
 6 files changed, 52 insertions(+), 218 deletions(-)

diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
index 987eeb64d9..4be417d491 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
@@ -33,6 +33,7 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.reactivestreams.Publisher;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
@@ -65,12 +66,12 @@ public final class JVMMailboxPathLocker implements 
MailboxPathLocker {
         StampedLock stampedLock = getStampedLock(path);
         switch (lockType) {
             case Read:
-                return Mono.using(stampedLock::readLock,
+                return Flux.using(stampedLock::readLock,
                         stamp -> Mono.from(execution),
                         stampedLock::unlockRead)
                     .subscribeOn(LOCKER_WRAPPER);
             case Write:
-                return Mono.using(stampedLock::writeLock,
+                return Flux.using(stampedLock::writeLock,
                         stamp -> Mono.from(execution),
                         stampedLock::unlockWrite)
                     .subscribeOn(LOCKER_WRAPPER);
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
index 5a7e5babd0..ecd16837ab 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
@@ -24,14 +24,6 @@ import javax.inject.Inject;
 public class MailboxManagerConfiguration {
     public static final MailboxManagerConfiguration DEFAULT = new 
MailboxManagerConfiguration(BatchSizes.defaultValues());
 
-    static class NoMailboxConfiguration extends MailboxManagerConfiguration {
-        // Spring hack
-
-        public NoMailboxConfiguration() {
-            super(BatchSizes.defaultValues());
-        }
-    }
-
     private final BatchSizes batchSizes;
 
     @Inject
@@ -42,12 +34,4 @@ public class MailboxManagerConfiguration {
     public BatchSizes getBatchSizes() {
         return batchSizes;
     }
-
-    public MessageBatcher getCopyBatcher() {
-        return new 
MessageBatcher(batchSizes.getCopyBatchSize().orElse(MessageBatcher.NO_BATCH_SIZE));
-    }
-
-    public MessageBatcher getMoveBatcher() {
-        return new 
MessageBatcher(batchSizes.getMoveBatchSize().orElse(MessageBatcher.NO_BATCH_SIZE));
-    }
 }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
deleted file mode 100644
index 66c00b2cce..0000000000
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.store;
-
-import java.util.List;
-
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.MessageRange;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import reactor.core.publisher.Flux;
-
-public class MessageBatcher {
-
-    public static final int NO_BATCH_SIZE = 0;
-
-    public interface BatchedOperation {
-        List<MessageRange> execute(MessageRange messageRange) throws 
MailboxException;
-    }
-
-    public interface ReactiveBatchedOperation {
-        Flux<MessageRange> execute(MessageRange messageRange);
-    }
-
-    private final int batchSize;
-
-    public MessageBatcher(int batchSize) {
-        Preconditions.checkArgument(batchSize >= NO_BATCH_SIZE);
-        this.batchSize = batchSize;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public List<MessageRange> batchMessages(MessageRange set, BatchedOperation 
batchedOperation) throws MailboxException {
-        if (batchSize > 0) {
-            return set.split(batchSize)
-                .stream()
-                .flatMap(Throwing.function(range -> 
batchedOperation.execute(range)
-                    .stream()))
-                .collect(ImmutableList.toImmutableList());
-        } else {
-            return batchedOperation.execute(set);
-        }
-    }
-
-    public Flux<MessageRange> batchMessagesReactive(MessageRange set, 
ReactiveBatchedOperation batchedOperation) {
-        if (batchSize > 0) {
-           return Flux.fromIterable(set.split(batchSize))
-                .flatMap(batchedOperation::execute);
-        } else {
-            return batchedOperation.execute(set);
-        }
-    }
-
-}
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index e1d272c5f8..20a750c34a 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -699,23 +699,23 @@ public class StoreMailboxManager implements 
MailboxManager {
     @Override
     public Flux<MessageRange> copyMessagesReactive(MessageRange set, 
MailboxPath from, MailboxPath to, MailboxSession session) {
         return Mono.zip(Mono.from(getMailboxReactive(from, session)), 
Mono.from(getMailboxReactive(to, session)))
-            .flatMapMany(fromTo -> 
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+            .flatMapMany(fromTo -> {
                 StoreMessageManager fromMessageManager = (StoreMessageManager) 
fromTo.getT1();
                 StoreMessageManager toMessageManager = (StoreMessageManager) 
fromTo.getT2();
 
-                return fromMessageManager.copyTo(messageRange, 
toMessageManager, session).flatMapIterable(Function.identity());
-            }));
+                return fromMessageManager.copyTo(set, toMessageManager, 
session);
+            });
     }
 
     @Override
     public Flux<MessageRange> copyMessagesReactive(MessageRange set, MailboxId 
from, MailboxId to, MailboxSession session) {
         return Mono.zip(Mono.from(getMailboxReactive(from, session)), 
Mono.from(getMailboxReactive(to, session)))
-            .flatMapMany(fromTo -> 
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+            .flatMapMany(fromTo -> {
                 StoreMessageManager fromMessageManager = (StoreMessageManager) 
fromTo.getT1();
                 StoreMessageManager toMessageManager = (StoreMessageManager) 
fromTo.getT2();
 
-                return fromMessageManager.copyTo(messageRange, 
toMessageManager, session).flatMapIterable(Function.identity());
-            }));
+                return fromMessageManager.copyTo(set, toMessageManager, 
session);
+            });
     }
 
     @Override
@@ -731,23 +731,23 @@ public class StoreMailboxManager implements 
MailboxManager {
     @Override
     public Flux<MessageRange> moveMessagesReactive(MessageRange set, 
MailboxPath from, MailboxPath to, MailboxSession session) {
         return Mono.zip(Mono.from(getMailboxReactive(from, session)), 
Mono.from(getMailboxReactive(to, session)))
-            .flatMapMany(fromTo -> 
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+            .flatMapMany(fromTo -> {
                 StoreMessageManager fromMessageManager = (StoreMessageManager) 
fromTo.getT1();
                 StoreMessageManager toMessageManager = (StoreMessageManager) 
fromTo.getT2();
 
-                return fromMessageManager.moveTo(messageRange, 
toMessageManager, session).flatMapIterable(Function.identity());
-            }));
+                return fromMessageManager.moveTo(set, toMessageManager, 
session);
+            });
     }
 
     @Override
     public Flux<MessageRange> moveMessagesReactive(MessageRange set, MailboxId 
from, MailboxId to, MailboxSession session) {
         return Mono.zip(Mono.from(getMailboxReactive(from, session)), 
Mono.from(getMailboxReactive(to, session)))
-            .flatMapMany(fromTo -> 
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+            .flatMapMany(fromTo -> {
                 StoreMessageManager fromMessageManager = (StoreMessageManager) 
fromTo.getT1();
                 StoreMessageManager toMessageManager = (StoreMessageManager) 
fromTo.getT2();
 
-                return fromMessageManager.moveTo(messageRange, 
toMessageManager, session).flatMapIterable(Function.identity());
-            }));
+                return fromMessageManager.moveTo(set, toMessageManager, 
session);
+            });
     }
 
     @Override
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index a5fe322689..1e1c29f5aa 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -721,31 +721,31 @@ public class StoreMessageManager implements 
MessageManager {
     /**
      * Copy the {@link MessageRange} to the {@link StoreMessageManager}
      */
-    public Mono<List<MessageRange>> copyTo(MessageRange set, 
StoreMessageManager toMailbox, MailboxSession session) {
+    public Flux<MessageRange> copyTo(MessageRange set, StoreMessageManager 
toMailbox, MailboxSession session) {
         if (!toMailbox.isWriteable(session)) {
-            return Mono.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
+            return Flux.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
         }
         //TODO lock the from mailbox too, in a non-deadlocking manner - how?
-        return 
Mono.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+        return 
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
             copy(set, toMailbox, session)
-                .map(map -> MessageRange.toRanges(new 
ArrayList<>(map.keySet()))),
+                .flatMapIterable(map -> MessageRange.toRanges(new 
ArrayList<>(map.keySet()))),
             MailboxPathLocker.LockType.Write));
     }
 
     /**
      * Move the {@link MessageRange} to the {@link StoreMessageManager}
      */
-    public Mono<List<MessageRange>> moveTo(MessageRange set, 
StoreMessageManager toMailbox, MailboxSession session) {
+    public Flux<MessageRange> moveTo(MessageRange set, StoreMessageManager 
toMailbox, MailboxSession session) {
         if (!isWriteable(session)) {
-            return Mono.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
+            return Flux.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
         }
         if (!toMailbox.isWriteable(session)) {
-            return Mono.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
+            return Flux.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
         }
         //TODO lock the from mailbox too, in a non-deadlocking manner - how?
-        return 
Mono.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+        return 
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
             move(set, toMailbox, session)
-                .map(map -> MessageRange.toRanges(new 
ArrayList<>(map.keySet()))),
+                .flatMapIterable(map -> MessageRange.toRanges(new 
ArrayList<>(map.keySet()))),
             MailboxPathLocker.LockType.Write));
     }
 
@@ -866,20 +866,22 @@ public class StoreMessageManager implements 
MessageManager {
     }
 
 
-    private Mono<SortedMap<MessageUid, MessageMetaData>> copy(MessageRange 
set, StoreMessageManager to, MailboxSession session) {
+    private Flux<SortedMap<MessageUid, MessageMetaData>> copy(MessageRange 
set, StoreMessageManager to, MailboxSession session) {
         return retrieveOriginalRows(set, session)
-            .collectList()
-            .flatMap(originalRows -> to.copy(originalRows, 
session).collectList().flatMap(copyResult -> {
-                SortedMap<MessageUid, MessageMetaData> copiedUids = 
collectMetadata(copyResult.iterator());
+            .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+            .concatMap(window -> window
+                .collectList()
+                .flatMap(originalRows -> to.copy(originalRows, 
session).collectList().flatMap(copyResult -> {
+                    SortedMap<MessageUid, MessageMetaData> copiedUids = 
collectMetadata(copyResult.iterator());
 
-                ImmutableList<MessageId> messageIds = originalRows.stream()
-                    
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
-                    .collect(ImmutableList.toImmutableList());
+                    ImmutableList<MessageId> messageIds = originalRows.stream()
+                        
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+                        .collect(ImmutableList.toImmutableList());
 
-                MessageMoves messageMoves = MessageMoves.builder()
-                    .previousMailboxIds(getMailboxEntity().getMailboxId())
-                    .targetMailboxIds(to.getMailboxEntity().getMailboxId(), 
getMailboxEntity().getMailboxId())
-                    .build();
+                    MessageMoves messageMoves = MessageMoves.builder()
+                        .previousMailboxIds(getMailboxEntity().getMailboxId())
+                        
.targetMailboxIds(to.getMailboxEntity().getMailboxId(), 
getMailboxEntity().getMailboxId())
+                        .build();
 
                 return Flux.concat(
                     eventBus.dispatch(EventFactory.added()
@@ -898,23 +900,25 @@ public class StoreMessageManager implements 
MessageManager {
                         
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())))
                     .then()
                     .thenReturn(copiedUids);
-            }));
+            })));
     }
 
-    private Mono<SortedMap<MessageUid, MessageMetaData>> move(MessageRange 
set, StoreMessageManager to, MailboxSession session) {
+    private Flux<SortedMap<MessageUid, MessageMetaData>> move(MessageRange 
set, StoreMessageManager to, MailboxSession session) {
         return retrieveOriginalRows(set, session)
-            .collectList()
-            .flatMap(originalRows -> to.move(originalRows, 
session).flatMap(moveResult -> {
-                SortedMap<MessageUid, MessageMetaData> moveUids = 
collectMetadata(moveResult.getMovedMessages().iterator());
+            .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+            .concatMap(window -> window
+                .collectList()
+                .flatMap(originalRows -> to.move(originalRows, 
session).flatMap(moveResult -> {
+                    SortedMap<MessageUid, MessageMetaData> moveUids = 
collectMetadata(moveResult.getMovedMessages().iterator());
 
-                ImmutableList<MessageId> messageIds = originalRows.stream()
-                    
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
-                    .collect(ImmutableList.toImmutableList());
+                    ImmutableList<MessageId> messageIds = originalRows.stream()
+                        
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+                        .collect(ImmutableList.toImmutableList());
 
-                MessageMoves messageMoves = MessageMoves.builder()
-                    .previousMailboxIds(getMailboxEntity().getMailboxId())
-                    .targetMailboxIds(to.getMailboxEntity().getMailboxId())
-                    .build();
+                    MessageMoves messageMoves = MessageMoves.builder()
+                        .previousMailboxIds(getMailboxEntity().getMailboxId())
+                        .targetMailboxIds(to.getMailboxEntity().getMailboxId())
+                        .build();
 
                 return Flux.concat(
                     eventBus.dispatch(EventFactory.added()
@@ -940,7 +944,7 @@ public class StoreMessageManager implements MessageManager {
                         
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())))
                     .then()
                     .thenReturn(moveUids);
-            }));
+            })));
     }
 
     private Flux<MailboxMessage> retrieveOriginalRows(MessageRange set, 
MailboxSession session) {
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
deleted file mode 100644
index a2f214c986..0000000000
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.store;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.MessageRange;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.Lists;
-
-class MessageBatcherTest {
-
-    private MessageBatcher.BatchedOperation incrementBatcher =
-        messageRange -> Lists.<MessageRange>newArrayList(MessageRange.range(
-            messageRange.getUidFrom().next(),
-            messageRange.getUidTo().next()));
-
-    @Test
-    void batchMessagesShouldWorkOnSingleRangeMode() throws Exception {
-        MessageBatcher messageBatcher = new MessageBatcher(0);
-        
-        
assertThat(messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1), 
MessageUid.of(10)), incrementBatcher))
-            .containsOnly(MessageRange.range(MessageUid.of(2), 
MessageUid.of(11)));
-    }
-
-    @Test
-    void batchMessagesShouldWorkWithNonZeroBatchedSize() throws Exception {
-        MessageBatcher messageBatcher = new MessageBatcher(5);
-
-        
assertThat(messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1), 
MessageUid.of(10)), incrementBatcher))
-            .containsOnly(MessageRange.range(MessageUid.of(2), 
MessageUid.of(6)), MessageRange.range(MessageUid.of(7), MessageUid.of(11)));
-    }
-
-    @Test
-    void batchMessagesShouldPropagateExceptions() {
-        MessageBatcher messageBatcher = new MessageBatcher(0);
-
-        assertThatThrownBy(() -> 
messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1), 
MessageUid.of(10)),
-                messageRange -> {
-                    throw new MailboxException();
-                }))
-            .isInstanceOf(MailboxException.class);
-    }
-
-    @Test
-    void messageBatcherShouldThrowOnNegativeBatchSize() {
-        assertThatThrownBy(() -> new MessageBatcher(-1))
-            .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    void getBatchSizeShouldReturnTheBatchSize() {
-        int batchSize = 123;
-        MessageBatcher messageBatcher = new MessageBatcher(batchSize);
-        assertThat(messageBatcher.getBatchSize()).isEqualTo(batchSize);
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to