This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/3.8.x by this push:
new 7bd5b0e65f JAMES-4041 Fix OOM upon IMAP COPY
7bd5b0e65f is described below
commit 7bd5b0e65f95eb9441f178957bd9940bdbcf2c97
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu May 30 09:45:47 2024 +0200
JAMES-4041 Fix OOM upon IMAP COPY
Issue arised from command lines like:
```
a0 COPY 1:4294967295 otherMailbox
```
Semantically valid but it actually caused the batching algorithm to generate
with a batch size of 10 429.496.730 ranges. With a memory overhead of ~52
bytes
per range we would occupy a space of ~20GB thus the OOM.
The root cause lies on the MUA using an arbitrary large range instead of:
```
a0 COPY 1:* otherMailbox
```
We believe the batching semantic also is incorrect. Browsing large amounts
of
metadata actuall is not an issue, but the post traitment might be.
We thus:
- First consolidate the range against the actual messages in the store
- Window it using the batch size.
- And finally execute like before the copy/move logic.
This avoids generating a large amount of potentially enpty ranges.
MessageBatcher is furthermore removed as it no longer is used.
---
.../james/mailbox/store/JVMMailboxPathLocker.java | 5 +-
.../mailbox/store/MailboxManagerConfiguration.java | 16 -----
.../apache/james/mailbox/store/MessageBatcher.java | 77 ---------------------
.../james/mailbox/store/StoreMailboxManager.java | 24 +++----
.../james/mailbox/store/StoreMessageManager.java | 70 ++++++++++---------
.../james/mailbox/store/MessageBatcherTest.java | 78 ----------------------
6 files changed, 52 insertions(+), 218 deletions(-)
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
index 987eeb64d9..4be417d491 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
@@ -33,6 +33,7 @@ import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxPath;
import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@@ -65,12 +66,12 @@ public final class JVMMailboxPathLocker implements
MailboxPathLocker {
StampedLock stampedLock = getStampedLock(path);
switch (lockType) {
case Read:
- return Mono.using(stampedLock::readLock,
+ return Flux.using(stampedLock::readLock,
stamp -> Mono.from(execution),
stampedLock::unlockRead)
.subscribeOn(LOCKER_WRAPPER);
case Write:
- return Mono.using(stampedLock::writeLock,
+ return Flux.using(stampedLock::writeLock,
stamp -> Mono.from(execution),
stampedLock::unlockWrite)
.subscribeOn(LOCKER_WRAPPER);
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
index 5a7e5babd0..ecd16837ab 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
@@ -24,14 +24,6 @@ import javax.inject.Inject;
public class MailboxManagerConfiguration {
public static final MailboxManagerConfiguration DEFAULT = new
MailboxManagerConfiguration(BatchSizes.defaultValues());
- static class NoMailboxConfiguration extends MailboxManagerConfiguration {
- // Spring hack
-
- public NoMailboxConfiguration() {
- super(BatchSizes.defaultValues());
- }
- }
-
private final BatchSizes batchSizes;
@Inject
@@ -42,12 +34,4 @@ public class MailboxManagerConfiguration {
public BatchSizes getBatchSizes() {
return batchSizes;
}
-
- public MessageBatcher getCopyBatcher() {
- return new
MessageBatcher(batchSizes.getCopyBatchSize().orElse(MessageBatcher.NO_BATCH_SIZE));
- }
-
- public MessageBatcher getMoveBatcher() {
- return new
MessageBatcher(batchSizes.getMoveBatchSize().orElse(MessageBatcher.NO_BATCH_SIZE));
- }
}
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
deleted file mode 100644
index 66c00b2cce..0000000000
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.store;
-
-import java.util.List;
-
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.MessageRange;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import reactor.core.publisher.Flux;
-
-public class MessageBatcher {
-
- public static final int NO_BATCH_SIZE = 0;
-
- public interface BatchedOperation {
- List<MessageRange> execute(MessageRange messageRange) throws
MailboxException;
- }
-
- public interface ReactiveBatchedOperation {
- Flux<MessageRange> execute(MessageRange messageRange);
- }
-
- private final int batchSize;
-
- public MessageBatcher(int batchSize) {
- Preconditions.checkArgument(batchSize >= NO_BATCH_SIZE);
- this.batchSize = batchSize;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public List<MessageRange> batchMessages(MessageRange set, BatchedOperation
batchedOperation) throws MailboxException {
- if (batchSize > 0) {
- return set.split(batchSize)
- .stream()
- .flatMap(Throwing.function(range ->
batchedOperation.execute(range)
- .stream()))
- .collect(ImmutableList.toImmutableList());
- } else {
- return batchedOperation.execute(set);
- }
- }
-
- public Flux<MessageRange> batchMessagesReactive(MessageRange set,
ReactiveBatchedOperation batchedOperation) {
- if (batchSize > 0) {
- return Flux.fromIterable(set.split(batchSize))
- .flatMap(batchedOperation::execute);
- } else {
- return batchedOperation.execute(set);
- }
- }
-
-}
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index e1d272c5f8..20a750c34a 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -699,23 +699,23 @@ public class StoreMailboxManager implements
MailboxManager {
@Override
public Flux<MessageRange> copyMessagesReactive(MessageRange set,
MailboxPath from, MailboxPath to, MailboxSession session) {
return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
- .flatMapMany(fromTo ->
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+ .flatMapMany(fromTo -> {
StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
- return fromMessageManager.copyTo(messageRange,
toMessageManager, session).flatMapIterable(Function.identity());
- }));
+ return fromMessageManager.copyTo(set, toMessageManager,
session);
+ });
}
@Override
public Flux<MessageRange> copyMessagesReactive(MessageRange set, MailboxId
from, MailboxId to, MailboxSession session) {
return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
- .flatMapMany(fromTo ->
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+ .flatMapMany(fromTo -> {
StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
- return fromMessageManager.copyTo(messageRange,
toMessageManager, session).flatMapIterable(Function.identity());
- }));
+ return fromMessageManager.copyTo(set, toMessageManager,
session);
+ });
}
@Override
@@ -731,23 +731,23 @@ public class StoreMailboxManager implements
MailboxManager {
@Override
public Flux<MessageRange> moveMessagesReactive(MessageRange set,
MailboxPath from, MailboxPath to, MailboxSession session) {
return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
- .flatMapMany(fromTo ->
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+ .flatMapMany(fromTo -> {
StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
- return fromMessageManager.moveTo(messageRange,
toMessageManager, session).flatMapIterable(Function.identity());
- }));
+ return fromMessageManager.moveTo(set, toMessageManager,
session);
+ });
}
@Override
public Flux<MessageRange> moveMessagesReactive(MessageRange set, MailboxId
from, MailboxId to, MailboxSession session) {
return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
- .flatMapMany(fromTo ->
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+ .flatMapMany(fromTo -> {
StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
- return fromMessageManager.moveTo(messageRange,
toMessageManager, session).flatMapIterable(Function.identity());
- }));
+ return fromMessageManager.moveTo(set, toMessageManager,
session);
+ });
}
@Override
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index a5fe322689..1e1c29f5aa 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -721,31 +721,31 @@ public class StoreMessageManager implements
MessageManager {
/**
* Copy the {@link MessageRange} to the {@link StoreMessageManager}
*/
- public Mono<List<MessageRange>> copyTo(MessageRange set,
StoreMessageManager toMailbox, MailboxSession session) {
+ public Flux<MessageRange> copyTo(MessageRange set, StoreMessageManager
toMailbox, MailboxSession session) {
if (!toMailbox.isWriteable(session)) {
- return Mono.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
}
//TODO lock the from mailbox too, in a non-deadlocking manner - how?
- return
Mono.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+ return
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
copy(set, toMailbox, session)
- .map(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
+ .flatMapIterable(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
MailboxPathLocker.LockType.Write));
}
/**
* Move the {@link MessageRange} to the {@link StoreMessageManager}
*/
- public Mono<List<MessageRange>> moveTo(MessageRange set,
StoreMessageManager toMailbox, MailboxSession session) {
+ public Flux<MessageRange> moveTo(MessageRange set, StoreMessageManager
toMailbox, MailboxSession session) {
if (!isWriteable(session)) {
- return Mono.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
}
if (!toMailbox.isWriteable(session)) {
- return Mono.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
}
//TODO lock the from mailbox too, in a non-deadlocking manner - how?
- return
Mono.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+ return
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
move(set, toMailbox, session)
- .map(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
+ .flatMapIterable(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
MailboxPathLocker.LockType.Write));
}
@@ -866,20 +866,22 @@ public class StoreMessageManager implements
MessageManager {
}
- private Mono<SortedMap<MessageUid, MessageMetaData>> copy(MessageRange
set, StoreMessageManager to, MailboxSession session) {
+ private Flux<SortedMap<MessageUid, MessageMetaData>> copy(MessageRange
set, StoreMessageManager to, MailboxSession session) {
return retrieveOriginalRows(set, session)
- .collectList()
- .flatMap(originalRows -> to.copy(originalRows,
session).collectList().flatMap(copyResult -> {
- SortedMap<MessageUid, MessageMetaData> copiedUids =
collectMetadata(copyResult.iterator());
+ .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+ .concatMap(window -> window
+ .collectList()
+ .flatMap(originalRows -> to.copy(originalRows,
session).collectList().flatMap(copyResult -> {
+ SortedMap<MessageUid, MessageMetaData> copiedUids =
collectMetadata(copyResult.iterator());
- ImmutableList<MessageId> messageIds = originalRows.stream()
-
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
- .collect(ImmutableList.toImmutableList());
+ ImmutableList<MessageId> messageIds = originalRows.stream()
+
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+ .collect(ImmutableList.toImmutableList());
- MessageMoves messageMoves = MessageMoves.builder()
- .previousMailboxIds(getMailboxEntity().getMailboxId())
- .targetMailboxIds(to.getMailboxEntity().getMailboxId(),
getMailboxEntity().getMailboxId())
- .build();
+ MessageMoves messageMoves = MessageMoves.builder()
+ .previousMailboxIds(getMailboxEntity().getMailboxId())
+
.targetMailboxIds(to.getMailboxEntity().getMailboxId(),
getMailboxEntity().getMailboxId())
+ .build();
return Flux.concat(
eventBus.dispatch(EventFactory.added()
@@ -898,23 +900,25 @@ public class StoreMessageManager implements
MessageManager {
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())))
.then()
.thenReturn(copiedUids);
- }));
+ })));
}
- private Mono<SortedMap<MessageUid, MessageMetaData>> move(MessageRange
set, StoreMessageManager to, MailboxSession session) {
+ private Flux<SortedMap<MessageUid, MessageMetaData>> move(MessageRange
set, StoreMessageManager to, MailboxSession session) {
return retrieveOriginalRows(set, session)
- .collectList()
- .flatMap(originalRows -> to.move(originalRows,
session).flatMap(moveResult -> {
- SortedMap<MessageUid, MessageMetaData> moveUids =
collectMetadata(moveResult.getMovedMessages().iterator());
+ .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+ .concatMap(window -> window
+ .collectList()
+ .flatMap(originalRows -> to.move(originalRows,
session).flatMap(moveResult -> {
+ SortedMap<MessageUid, MessageMetaData> moveUids =
collectMetadata(moveResult.getMovedMessages().iterator());
- ImmutableList<MessageId> messageIds = originalRows.stream()
-
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
- .collect(ImmutableList.toImmutableList());
+ ImmutableList<MessageId> messageIds = originalRows.stream()
+
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+ .collect(ImmutableList.toImmutableList());
- MessageMoves messageMoves = MessageMoves.builder()
- .previousMailboxIds(getMailboxEntity().getMailboxId())
- .targetMailboxIds(to.getMailboxEntity().getMailboxId())
- .build();
+ MessageMoves messageMoves = MessageMoves.builder()
+ .previousMailboxIds(getMailboxEntity().getMailboxId())
+ .targetMailboxIds(to.getMailboxEntity().getMailboxId())
+ .build();
return Flux.concat(
eventBus.dispatch(EventFactory.added()
@@ -940,7 +944,7 @@ public class StoreMessageManager implements MessageManager {
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())))
.then()
.thenReturn(moveUids);
- }));
+ })));
}
private Flux<MailboxMessage> retrieveOriginalRows(MessageRange set,
MailboxSession session) {
diff --git
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
deleted file mode 100644
index a2f214c986..0000000000
---
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.store;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.MessageRange;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.Lists;
-
-class MessageBatcherTest {
-
- private MessageBatcher.BatchedOperation incrementBatcher =
- messageRange -> Lists.<MessageRange>newArrayList(MessageRange.range(
- messageRange.getUidFrom().next(),
- messageRange.getUidTo().next()));
-
- @Test
- void batchMessagesShouldWorkOnSingleRangeMode() throws Exception {
- MessageBatcher messageBatcher = new MessageBatcher(0);
-
-
assertThat(messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1),
MessageUid.of(10)), incrementBatcher))
- .containsOnly(MessageRange.range(MessageUid.of(2),
MessageUid.of(11)));
- }
-
- @Test
- void batchMessagesShouldWorkWithNonZeroBatchedSize() throws Exception {
- MessageBatcher messageBatcher = new MessageBatcher(5);
-
-
assertThat(messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1),
MessageUid.of(10)), incrementBatcher))
- .containsOnly(MessageRange.range(MessageUid.of(2),
MessageUid.of(6)), MessageRange.range(MessageUid.of(7), MessageUid.of(11)));
- }
-
- @Test
- void batchMessagesShouldPropagateExceptions() {
- MessageBatcher messageBatcher = new MessageBatcher(0);
-
- assertThatThrownBy(() ->
messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1),
MessageUid.of(10)),
- messageRange -> {
- throw new MailboxException();
- }))
- .isInstanceOf(MailboxException.class);
- }
-
- @Test
- void messageBatcherShouldThrowOnNegativeBatchSize() {
- assertThatThrownBy(() -> new MessageBatcher(-1))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- void getBatchSizeShouldReturnTheBatchSize() {
- int batchSize = 123;
- MessageBatcher messageBatcher = new MessageBatcher(batchSize);
- assertThat(messageBatcher.getBatchSize()).isEqualTo(batchSize);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]