This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 445d2a0c24 JAMES-4041 Fix OOM upon IMAP COPY (#2265)
445d2a0c24 is described below
commit 445d2a0c2470cfd835f212610106e6792ec66aef
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri May 31 08:57:40 2024 +0200
JAMES-4041 Fix OOM upon IMAP COPY (#2265)
Issue arised from command lines like:
```
a0 COPY 1:4294967295 otherMailbox
```
Semantically valid but it actually caused the batching algorithm to generate
with a batch size of 10 429.496.730 ranges. With a memory overhead of ~52
bytes
per range we would occupy a space of ~20GB thus the OOM.
The root cause lies on the MUA using an arbitrary large range instead of:
```
a0 COPY 1:* otherMailbox
```
We believe the batching semantic also is incorrect. Browsing large amounts
of
metadata actuall is not an issue, but the post traitment might be.
We thus:
- First consolidate the range against the actual messages in the store
- Window it using the batch size.
- And finally execute like before the copy/move logic.
This avoids generating a large amount of potentially enpty ranges.
MessageBatcher is furthermore removed as it no longer is used.
We can remove MessageRange::split
---
.../apache/james/mailbox/model/MessageRange.java | 28 ----
.../james/mailbox/model/MessageRangeTest.java | 25 ---
.../james/mailbox/store/JVMMailboxPathLocker.java | 5 +-
.../mailbox/store/MailboxManagerConfiguration.java | 16 --
.../apache/james/mailbox/store/MessageBatcher.java | 77 ---------
.../james/mailbox/store/StoreMailboxManager.java | 24 +--
.../james/mailbox/store/StoreMessageManager.java | 176 +++++++++++----------
.../james/mailbox/store/MessageBatcherTest.java | 78 ---------
8 files changed, 105 insertions(+), 324 deletions(-)
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MessageRange.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MessageRange.java
index 2084c6c8a4..a4bb08f227 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MessageRange.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MessageRange.java
@@ -226,34 +226,6 @@ public class MessageRange implements Iterable<MessageUid> {
}
}
-
-
- /**
- * Tries to split the given {@link MessageRange} to a {@link List} of
{@link MessageRange}'s which
- * select only a max amount of items. This only work for {@link
MessageRange}'s with {@link Type} of
- * {@link Type#RANGE}.
- */
- public List<MessageRange> split(int maxItems) {
- List<MessageRange> ranges = new ArrayList<>();
- if (getType() == Type.RANGE) {
- long from = getUidFrom().asLong();
- long to = getUidTo().asLong();
- long realTo = to;
- while (from <= realTo) {
- to = Math.min(from + maxItems - 1, realTo);
- if (from == to) {
- ranges.add(MessageUid.of(from).toRange());
- } else {
- ranges.add(MessageRange.range(MessageUid.of(from),
MessageUid.of(to)));
- }
-
- from = to + 1;
- }
- } else {
- ranges.add(this);
- }
- return ranges;
- }
@Override
public int hashCode() {
diff --git
a/mailbox/api/src/test/java/org/apache/james/mailbox/model/MessageRangeTest.java
b/mailbox/api/src/test/java/org/apache/james/mailbox/model/MessageRangeTest.java
index ab10eb00b3..391b9443ea 100644
---
a/mailbox/api/src/test/java/org/apache/james/mailbox/model/MessageRangeTest.java
+++
b/mailbox/api/src/test/java/org/apache/james/mailbox/model/MessageRangeTest.java
@@ -56,31 +56,6 @@ class MessageRangeTest {
List<MessageRange> ranges =
MessageRange.toRanges(Arrays.asList(MessageUid.of(1L), MessageUid.of(2L)));
assertThat(ranges).containsExactly(MessageRange.range(MessageUid.of(1),
MessageUid.of(2)));
}
-
- @Test
- void splitASingletonRangeShouldReturnASingleRange() {
- MessageRange one = MessageUid.of(1).toRange();
- List<MessageRange> ranges = one.split(2);
- assertThat(ranges).containsExactly(MessageUid.of(1).toRange());
- }
-
- @Test
- void splitUnboundedRangeShouldReturnTheSameRange() {
- MessageRange from = MessageRange.from(MessageUid.of(1));
- List<MessageRange> ranges = from.split(2);
-
assertThat(ranges).containsExactly(MessageRange.from(MessageUid.of(1)));
- }
-
- @Test
- void splitTenElementsRangeShouldReturn4Ranges() {
- MessageRange range =
MessageRange.range(MessageUid.of(1),MessageUid.of(10));
- List<MessageRange> ranges = range.split(3);
- assertThat(ranges).containsExactly(
- MessageRange.range(MessageUid.of(1), MessageUid.of(3)),
- MessageRange.range(MessageUid.of(4), MessageUid.of(6)),
- MessageRange.range(MessageUid.of(7), MessageUid.of(9)),
- MessageUid.of(10).toRange());
- }
@Test
void includeShouldBeTrueWhenAfterFrom() {
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
index 987eeb64d9..4be417d491 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/JVMMailboxPathLocker.java
@@ -33,6 +33,7 @@ import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxPath;
import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@@ -65,12 +66,12 @@ public final class JVMMailboxPathLocker implements
MailboxPathLocker {
StampedLock stampedLock = getStampedLock(path);
switch (lockType) {
case Read:
- return Mono.using(stampedLock::readLock,
+ return Flux.using(stampedLock::readLock,
stamp -> Mono.from(execution),
stampedLock::unlockRead)
.subscribeOn(LOCKER_WRAPPER);
case Write:
- return Mono.using(stampedLock::writeLock,
+ return Flux.using(stampedLock::writeLock,
stamp -> Mono.from(execution),
stampedLock::unlockWrite)
.subscribeOn(LOCKER_WRAPPER);
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
index 7ba3373282..637a767288 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MailboxManagerConfiguration.java
@@ -24,14 +24,6 @@ import jakarta.inject.Inject;
public class MailboxManagerConfiguration {
public static final MailboxManagerConfiguration DEFAULT = new
MailboxManagerConfiguration(BatchSizes.defaultValues());
- static class NoMailboxConfiguration extends MailboxManagerConfiguration {
- // Spring hack
-
- public NoMailboxConfiguration() {
- super(BatchSizes.defaultValues());
- }
- }
-
private final BatchSizes batchSizes;
@Inject
@@ -42,12 +34,4 @@ public class MailboxManagerConfiguration {
public BatchSizes getBatchSizes() {
return batchSizes;
}
-
- public MessageBatcher getCopyBatcher() {
- return new
MessageBatcher(batchSizes.getCopyBatchSize().orElse(MessageBatcher.NO_BATCH_SIZE));
- }
-
- public MessageBatcher getMoveBatcher() {
- return new
MessageBatcher(batchSizes.getMoveBatchSize().orElse(MessageBatcher.NO_BATCH_SIZE));
- }
}
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
deleted file mode 100644
index 66c00b2cce..0000000000
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageBatcher.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.store;
-
-import java.util.List;
-
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.MessageRange;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import reactor.core.publisher.Flux;
-
-public class MessageBatcher {
-
- public static final int NO_BATCH_SIZE = 0;
-
- public interface BatchedOperation {
- List<MessageRange> execute(MessageRange messageRange) throws
MailboxException;
- }
-
- public interface ReactiveBatchedOperation {
- Flux<MessageRange> execute(MessageRange messageRange);
- }
-
- private final int batchSize;
-
- public MessageBatcher(int batchSize) {
- Preconditions.checkArgument(batchSize >= NO_BATCH_SIZE);
- this.batchSize = batchSize;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public List<MessageRange> batchMessages(MessageRange set, BatchedOperation
batchedOperation) throws MailboxException {
- if (batchSize > 0) {
- return set.split(batchSize)
- .stream()
- .flatMap(Throwing.function(range ->
batchedOperation.execute(range)
- .stream()))
- .collect(ImmutableList.toImmutableList());
- } else {
- return batchedOperation.execute(set);
- }
- }
-
- public Flux<MessageRange> batchMessagesReactive(MessageRange set,
ReactiveBatchedOperation batchedOperation) {
- if (batchSize > 0) {
- return Flux.fromIterable(set.split(batchSize))
- .flatMap(batchedOperation::execute);
- } else {
- return batchedOperation.execute(set);
- }
- }
-
-}
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 2da630967f..e922c0b83e 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -699,23 +699,23 @@ public class StoreMailboxManager implements
MailboxManager {
@Override
public Flux<MessageRange> copyMessagesReactive(MessageRange set,
MailboxPath from, MailboxPath to, MailboxSession session) {
return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
- .flatMapMany(fromTo ->
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+ .flatMapMany(fromTo -> {
StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
- return fromMessageManager.copyTo(messageRange,
toMessageManager, session).flatMapIterable(Function.identity());
- }));
+ return fromMessageManager.copyTo(set, toMessageManager,
session);
+ });
}
@Override
public Flux<MessageRange> copyMessagesReactive(MessageRange set, MailboxId
from, MailboxId to, MailboxSession session) {
return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
- .flatMapMany(fromTo ->
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+ .flatMapMany(fromTo -> {
StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
- return fromMessageManager.copyTo(messageRange,
toMessageManager, session).flatMapIterable(Function.identity());
- }));
+ return fromMessageManager.copyTo(set, toMessageManager,
session);
+ });
}
@Override
@@ -731,23 +731,23 @@ public class StoreMailboxManager implements
MailboxManager {
@Override
public Flux<MessageRange> moveMessagesReactive(MessageRange set,
MailboxPath from, MailboxPath to, MailboxSession session) {
return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
- .flatMapMany(fromTo ->
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+ .flatMapMany(fromTo -> {
StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
- return fromMessageManager.moveTo(messageRange,
toMessageManager, session).flatMapIterable(Function.identity());
- }));
+ return fromMessageManager.moveTo(set, toMessageManager,
session);
+ });
}
@Override
public Flux<MessageRange> moveMessagesReactive(MessageRange set, MailboxId
from, MailboxId to, MailboxSession session) {
return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
- .flatMapMany(fromTo ->
configuration.getMoveBatcher().batchMessagesReactive(set, messageRange -> {
+ .flatMapMany(fromTo -> {
StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
- return fromMessageManager.moveTo(messageRange,
toMessageManager, session).flatMapIterable(Function.identity());
- }));
+ return fromMessageManager.moveTo(set, toMessageManager,
session);
+ });
}
@Override
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 09aeb04bc8..79f3b52245 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -729,31 +729,31 @@ public class StoreMessageManager implements
MessageManager {
/**
* Copy the {@link MessageRange} to the {@link StoreMessageManager}
*/
- public Mono<List<MessageRange>> copyTo(MessageRange set,
StoreMessageManager toMailbox, MailboxSession session) {
+ public Flux<MessageRange> copyTo(MessageRange set, StoreMessageManager
toMailbox, MailboxSession session) {
if (!toMailbox.isWriteable(session)) {
- return Mono.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
}
//TODO lock the from mailbox too, in a non-deadlocking manner - how?
- return
Mono.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+ return
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
copy(set, toMailbox, session)
- .map(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
+ .flatMapIterable(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
MailboxPathLocker.LockType.Write));
}
/**
* Move the {@link MessageRange} to the {@link StoreMessageManager}
*/
- public Mono<List<MessageRange>> moveTo(MessageRange set,
StoreMessageManager toMailbox, MailboxSession session) {
+ public Flux<MessageRange> moveTo(MessageRange set, StoreMessageManager
toMailbox, MailboxSession session) {
if (!isWriteable(session)) {
- return Mono.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
}
if (!toMailbox.isWriteable(session)) {
- return Mono.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
}
//TODO lock the from mailbox too, in a non-deadlocking manner - how?
- return
Mono.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+ return
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
move(set, toMailbox, session)
- .map(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
+ .flatMapIterable(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
MailboxPathLocker.LockType.Write));
}
@@ -885,85 +885,89 @@ public class StoreMessageManager implements
MessageManager {
}
- private Mono<SortedMap<MessageUid, MessageMetaData>> copy(MessageRange
set, StoreMessageManager to, MailboxSession session) {
+ private Flux<SortedMap<MessageUid, MessageMetaData>> copy(MessageRange
set, StoreMessageManager to, MailboxSession session) {
return retrieveOriginalRows(set, session)
- .collectList()
- .flatMap(originalRows -> to.copy(originalRows,
session).collectList().flatMap(copyResult -> {
- SortedMap<MessageUid, MessageMetaData> copiedUids =
collectMetadata(copyResult.iterator());
-
- ImmutableList<MessageId> messageIds = originalRows.stream()
-
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
- .collect(ImmutableList.toImmutableList());
-
- MessageMoves messageMoves = MessageMoves.builder()
- .previousMailboxIds(getMailboxEntity().getMailboxId())
- .targetMailboxIds(to.getMailboxEntity().getMailboxId(),
getMailboxEntity().getMailboxId())
- .build();
-
- return Flux.concat(
- eventBus.dispatch(EventFactory.added()
- .randomEventId()
- .mailboxSession(session)
- .mailbox(to.getMailboxEntity())
- .metaData(copiedUids)
- .isDelivery(!IS_DELIVERY)
- .isAppended(!IS_APPENDED)
- .build(),
- new
MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())),
- eventBus.dispatch(EventFactory.moved()
- .messageMoves(messageMoves)
- .messageId(messageIds)
- .session(session)
- .build(),
-
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())))
- .then()
- .thenReturn(copiedUids);
- }));
- }
-
- private Mono<SortedMap<MessageUid, MessageMetaData>> move(MessageRange
set, StoreMessageManager to, MailboxSession session) {
+ .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+ .concatMap(window -> window
+ .collectList()
+ .flatMap(originalRows -> to.copy(originalRows,
session).collectList().flatMap(copyResult -> {
+ SortedMap<MessageUid, MessageMetaData> copiedUids =
collectMetadata(copyResult.iterator());
+
+ ImmutableList<MessageId> messageIds = originalRows.stream()
+
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+ .collect(ImmutableList.toImmutableList());
+
+ MessageMoves messageMoves = MessageMoves.builder()
+ .previousMailboxIds(getMailboxEntity().getMailboxId())
+
.targetMailboxIds(to.getMailboxEntity().getMailboxId(),
getMailboxEntity().getMailboxId())
+ .build();
+
+ return Flux.concat(
+ eventBus.dispatch(EventFactory.added()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(to.getMailboxEntity())
+ .metaData(copiedUids)
+ .isDelivery(!IS_DELIVERY)
+ .isAppended(!IS_APPENDED)
+ .build(),
+ new
MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())),
+ eventBus.dispatch(EventFactory.moved()
+ .messageMoves(messageMoves)
+ .messageId(messageIds)
+ .session(session)
+ .build(),
+
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())))
+ .then()
+ .thenReturn(copiedUids);
+ })));
+ }
+
+ private Flux<SortedMap<MessageUid, MessageMetaData>> move(MessageRange
set, StoreMessageManager to, MailboxSession session) {
return retrieveOriginalRows(set, session)
- .collectList()
- .flatMap(originalRows -> to.move(originalRows,
session).flatMap(moveResult -> {
- SortedMap<MessageUid, MessageMetaData> moveUids =
collectMetadata(moveResult.getMovedMessages().iterator());
-
- ImmutableList<MessageId> messageIds = originalRows.stream()
-
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
- .collect(ImmutableList.toImmutableList());
-
- MessageMoves messageMoves = MessageMoves.builder()
- .previousMailboxIds(getMailboxEntity().getMailboxId())
- .targetMailboxIds(to.getMailboxEntity().getMailboxId())
- .build();
-
- return Flux.concat(
- eventBus.dispatch(EventFactory.added()
- .randomEventId()
- .mailboxSession(session)
- .mailbox(to.getMailboxEntity())
- .metaData(moveUids)
- .isDelivery(!IS_DELIVERY)
- .isAppended(!IS_APPENDED)
- .movedFrom(getId())
- .build(),
- new
MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())),
- eventBus.dispatch(EventFactory.expunged()
- .randomEventId()
- .mailboxSession(session)
- .mailbox(getMailboxEntity())
- .addMetaData(moveResult.getOriginalMessages())
- .movedTo(to.getId())
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId())),
- eventBus.dispatch(EventFactory.moved()
- .messageMoves(messageMoves)
- .messageId(messageIds)
- .session(session)
- .build(),
-
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())))
- .then()
- .thenReturn(moveUids);
- }));
+ .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+ .concatMap(window -> window
+ .collectList()
+ .flatMap(originalRows -> to.move(originalRows,
session).flatMap(moveResult -> {
+ SortedMap<MessageUid, MessageMetaData> moveUids =
collectMetadata(moveResult.getMovedMessages().iterator());
+
+ ImmutableList<MessageId> messageIds = originalRows.stream()
+
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+ .collect(ImmutableList.toImmutableList());
+
+ MessageMoves messageMoves = MessageMoves.builder()
+ .previousMailboxIds(getMailboxEntity().getMailboxId())
+ .targetMailboxIds(to.getMailboxEntity().getMailboxId())
+ .build();
+
+ return Flux.concat(
+ eventBus.dispatch(EventFactory.added()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(to.getMailboxEntity())
+ .metaData(moveUids)
+ .isDelivery(!IS_DELIVERY)
+ .isAppended(!IS_APPENDED)
+ .movedFrom(getId())
+ .build(),
+ new
MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())),
+ eventBus.dispatch(EventFactory.expunged()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(getMailboxEntity())
+
.addMetaData(moveResult.getOriginalMessages())
+ .movedTo(to.getId())
+ .build(),
+ new
MailboxIdRegistrationKey(mailbox.getMailboxId())),
+ eventBus.dispatch(EventFactory.moved()
+ .messageMoves(messageMoves)
+ .messageId(messageIds)
+ .session(session)
+ .build(),
+
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())))
+ .then()
+ .thenReturn(moveUids);
+ })));
}
private Flux<MailboxMessage> retrieveOriginalRows(MessageRange set,
MailboxSession session) {
diff --git
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
deleted file mode 100644
index a2f214c986..0000000000
---
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageBatcherTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.store;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.MessageRange;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.Lists;
-
-class MessageBatcherTest {
-
- private MessageBatcher.BatchedOperation incrementBatcher =
- messageRange -> Lists.<MessageRange>newArrayList(MessageRange.range(
- messageRange.getUidFrom().next(),
- messageRange.getUidTo().next()));
-
- @Test
- void batchMessagesShouldWorkOnSingleRangeMode() throws Exception {
- MessageBatcher messageBatcher = new MessageBatcher(0);
-
-
assertThat(messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1),
MessageUid.of(10)), incrementBatcher))
- .containsOnly(MessageRange.range(MessageUid.of(2),
MessageUid.of(11)));
- }
-
- @Test
- void batchMessagesShouldWorkWithNonZeroBatchedSize() throws Exception {
- MessageBatcher messageBatcher = new MessageBatcher(5);
-
-
assertThat(messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1),
MessageUid.of(10)), incrementBatcher))
- .containsOnly(MessageRange.range(MessageUid.of(2),
MessageUid.of(6)), MessageRange.range(MessageUid.of(7), MessageUid.of(11)));
- }
-
- @Test
- void batchMessagesShouldPropagateExceptions() {
- MessageBatcher messageBatcher = new MessageBatcher(0);
-
- assertThatThrownBy(() ->
messageBatcher.batchMessages(MessageRange.range(MessageUid.of(1),
MessageUid.of(10)),
- messageRange -> {
- throw new MailboxException();
- }))
- .isInstanceOf(MailboxException.class);
- }
-
- @Test
- void messageBatcherShouldThrowOnNegativeBatchSize() {
- assertThatThrownBy(() -> new MessageBatcher(-1))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- void getBatchSizeShouldReturnTheBatchSize() {
- int batchSize = 123;
- MessageBatcher messageBatcher = new MessageBatcher(batchSize);
- assertThat(messageBatcher.getBatchSize()).isEqualTo(batchSize);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]