JAMES-2550 Mailqueue with reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f121dd8d Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f121dd8d Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f121dd8d Branch: refs/heads/master Commit: f121dd8d413321db9a85a5cce42b572da392c2b5 Parents: e96b5cb Author: Benoit Tellier <[email protected]> Authored: Tue Dec 4 16:10:33 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Thu Dec 6 15:04:29 2018 +0700 ---------------------------------------------------------------------- pom.xml | 7 +++ server/queue/queue-rabbitmq/pom.xml | 9 +++ .../apache/james/queue/rabbitmq/Dequeuer.java | 2 +- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 4 +- .../queue/rabbitmq/view/api/MailQueueView.java | 2 +- .../rabbitmq/view/cassandra/BrowseStartDAO.java | 26 ++++---- .../cassandra/CassandraMailQueueBrowser.java | 63 +++++++++----------- .../cassandra/CassandraMailQueueMailDelete.java | 31 +++++----- .../cassandra/CassandraMailQueueMailStore.java | 7 ++- .../view/cassandra/CassandraMailQueueView.java | 42 ++++++------- .../view/cassandra/DeletedMailsDAO.java | 20 +++---- .../view/cassandra/EnqueuedMailsDAO.java | 25 ++++---- .../view/cassandra/model/BucketedSlices.java | 22 +++---- .../view/cassandra/BrowseStartDAOTest.java | 27 +++++---- .../CassandraMailQueueViewTestFactory.java | 10 +++- .../view/cassandra/DeletedMailsDAOTest.java | 22 +++---- .../view/cassandra/EnqueuedMailsDaoTest.java | 16 ++--- .../cassandra/model/BucketedSlicesTest.java | 12 ++-- 18 files changed, 178 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 300a18b..c4d85bf 100644 --- a/pom.xml +++ b/pom.xml @@ -666,6 +666,13 @@ <dependencyManagement> <dependencies> <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-bom</artifactId> + <version>Bismuth-RELEASE</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> <groupId>${james.groupId}</groupId> <artifactId>apache-james-backends-cassandra</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml index 14564ff..ab4ffff 100644 --- a/server/queue/queue-rabbitmq/pom.xml +++ b/server/queue/queue-rabbitmq/pom.xml @@ -171,6 +171,15 @@ <version>${feign.version}</version> </dependency> <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 6caef63..76e9838 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -103,7 +103,7 @@ class Dequeuer { if (success) { dequeueMetric.increment(); rabbitClient.ack(deliveryTag); - mailQueueView.delete(DeleteCondition.withName(mail.getName())).join(); + mailQueueView.delete(DeleteCondition.withName(mail.getName())); } else { rabbitClient.nack(deliveryTag); } http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 0909469..1873313 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -93,12 +93,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue { @Override public long clear() { - return mailQueueView.delete(DeleteCondition.all()).join(); + return mailQueueView.delete(DeleteCondition.all()); } @Override public long remove(Type type, String value) { - return mailQueueView.delete(DeleteCondition.from(type, value)).join(); + return mailQueueView.delete(DeleteCondition.from(type, value)); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java index a291d61..12ca723 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -36,7 +36,7 @@ public interface MailQueueView { CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem); - CompletableFuture<Long> delete(DeleteCondition deleteCondition); + long delete(DeleteCondition deleteCondition); CompletableFuture<Boolean> isPresent(Mail mail); http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java index 4425c46..9552f5c 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java @@ -31,8 +31,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV import java.time.Instant; import java.util.Date; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; @@ -43,6 +41,7 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.google.common.annotations.VisibleForTesting; +import reactor.core.publisher.Mono; public class BrowseStartDAO { @@ -79,28 +78,29 @@ public class BrowseStartDAO { .value(QUEUE_NAME, bindMarker(QUEUE_NAME))); } - CompletableFuture<Optional<Instant>> findBrowseStart(MailQueueName queueName) { + Mono<Instant> findBrowseStart(MailQueueName queueName) { return selectOne(queueName) - .thenApply(optional -> optional.map(this::getBrowseStart)); + .map(this::getBrowseStart); } - CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) { - return executor.executeVoid(updateOne.bind() + Mono<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) { + return Mono.fromCompletionStage(executor.executeVoid(updateOne.bind() .setTimestamp(BROWSE_START, Date.from(sliceStart)) - .setString(QUEUE_NAME, mailQueueName.asString())); + .setString(QUEUE_NAME, mailQueueName.asString()))); } - CompletableFuture<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) { - return executor.executeVoid(insertOne.bind() + Mono<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) { + return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind() .setTimestamp(BROWSE_START, Date.from(sliceStart)) - .setString(QUEUE_NAME, mailQueueName.asString())); + .setString(QUEUE_NAME, mailQueueName.asString()))); } @VisibleForTesting - CompletableFuture<Optional<Row>> selectOne(MailQueueName queueName) { - return executor.executeSingleRow( + Mono<Row> selectOne(MailQueueName queueName) { + return Mono.fromCompletionStage(executor.executeSingleRow( selectOne.bind() - .setString(QUEUE_NAME, queueName.asString())); + .setString(QUEUE_NAME, queueName.asString()))) + .flatMap(Mono::justOrEmpty); } private Instant getBrowseStart(Row row) { http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index a5a9bb9..4279c15 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -21,16 +21,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId; import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; -import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice.allSlicesTill; import java.time.Clock; import java.time.Instant; import java.util.Comparator; import java.util.Iterator; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.stream.IntStream; -import java.util.stream.Stream; +import java.util.List; import javax.inject.Inject; import javax.mail.MessagingException; @@ -44,12 +40,14 @@ import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext; -import org.apache.james.util.FluentFutureStream; import org.apache.mailet.Mail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class CassandraMailQueueBrowser { @@ -101,23 +99,24 @@ public class CassandraMailQueueBrowser { this.clock = clock; } - CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName queueName) { + Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName queueName) { return browseReferences(queueName) - .map(this::toMailFuture, FluentFutureStream::unboxFuture) - .map(ManageableMailQueue.MailQueueItemView::new) - .completableFuture(); + .flatMapSequential(this::toMailFuture) + .map(ManageableMailQueue.MailQueueItemView::new); } - FluentFutureStream<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) { - return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName) - .thenApply(this::allSlicesStartingAt)) - .map(slice -> browseSlice(queueName, slice), FluentFutureStream::unboxFluentFuture); + Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) { + return browseStartDao.findBrowseStart(queueName) + .flatMapMany(this::allSlicesStartingAt) + .flatMapSequential(slice -> browseSlice(queueName, slice)) + .flatMapSequential(Flux::fromIterable) + .subscribeOn(Schedulers.parallel()); } - private CompletableFuture<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { + private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem(); - return mimeMessageStore.read(enqueuedItem.getPartsId()) - .thenApply(mimeMessage -> toMail(enqueuedItem, mimeMessage)); + return Mono.fromCompletionStage(mimeMessageStore.read(enqueuedItem.getPartsId())) + .map(mimeMessage -> toMail(enqueuedItem, mimeMessage)); } private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) { @@ -132,31 +131,25 @@ public class CassandraMailQueueBrowser { return mail; } - private FluentFutureStream<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) { - return FluentFutureStream.of( + private Mono<List<EnqueuedItemWithSlicingContext>> browseSlice(MailQueueName queueName, Slice slice) { + return allBucketIds() - .map(bucketId -> - browseBucket(queueName, slice, bucketId).completableFuture()), - FluentFutureStream::unboxStream) - .sorted(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); + .flatMap(bucketId -> browseBucket(queueName, slice, bucketId)) + .collectSortedList(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); } - private FluentFutureStream<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) { - return FluentFutureStream.of( - enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)) - .thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey())); + private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) { + return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId) + .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey())); } - private Stream<Slice> allSlicesStartingAt(Optional<Instant> maybeBrowseStart) { - return maybeBrowseStart - .map(Slice::of) - .map(startSlice -> allSlicesTill(startSlice, clock.instant(), configuration.getSliceWindow())) - .orElse(Stream.empty()); + private Flux<Slice> allSlicesStartingAt(Instant browseStart) { + return Flux.fromStream(Slice.of(browseStart).allSlicesTill(clock.instant(), configuration.getSliceWindow())); } - private Stream<BucketId> allBucketIds() { - return IntStream + private Flux<BucketId> allBucketIds() { + return Flux .range(0, configuration.getBucketCount()) - .mapToObj(BucketId::of); + .map(BucketId::of); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java index b94b1ed..57732cb 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -20,10 +20,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import java.time.Instant; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Stream; import javax.inject.Inject; @@ -32,6 +29,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; import org.apache.mailet.Mail; +import reactor.core.publisher.Mono; + public class CassandraMailQueueMailDelete { private final DeletedMailsDAO deletedMailsDao; @@ -53,42 +52,40 @@ public class CassandraMailQueueMailDelete { this.random = random; } - CompletableFuture<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) { + Mono<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) { return considerDeleted(MailKey.fromMail(mail), mailQueueName); } - CompletableFuture<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) { + Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) { return deletedMailsDao .markAsDeleted(mailQueueName, mailKey) - .thenRunAsync(() -> maybeUpdateBrowseStart(mailQueueName)); + .doOnTerminate(() -> maybeUpdateBrowseStart(mailQueueName)); } - CompletableFuture<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) { + Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) { return deletedMailsDao.isDeleted(mailQueueName, MailKey.fromMail(mail)); } - CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName) { - return findNewBrowseStart(mailQueueName) - .thenCompose(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart)); + void updateBrowseStart(MailQueueName mailQueueName) { + Mono<Instant> newBrowseStart = findNewBrowseStart(mailQueueName); + updateNewBrowseStart(mailQueueName, newBrowseStart); } private void maybeUpdateBrowseStart(MailQueueName mailQueueName) { if (shouldUpdateBrowseStart()) { - updateBrowseStart(mailQueueName).join(); + updateBrowseStart(mailQueueName); } } - private CompletableFuture<Optional<Instant>> findNewBrowseStart(MailQueueName mailQueueName) { + private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) { return cassandraMailQueueBrowser.browseReferences(mailQueueName) .map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart()) - .completableFuture() - .thenApply(Stream::findFirst); + .next(); } - private CompletableFuture<Void> updateNewBrowseStart(MailQueueName mailQueueName, Optional<Instant> maybeNewBrowseStart) { + private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Mono<Instant> maybeNewBrowseStart) { return maybeNewBrowseStart - .map(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant)) - .orElse(CompletableFuture.completedFuture(null)); + .flatMap(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant)); } private boolean shouldUpdateBrowseStart() { http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java index 41c28a9..1dd07f5 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java @@ -21,7 +21,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import java.time.Clock; import java.time.Instant; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; @@ -32,6 +31,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Bucke import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext; import org.apache.mailet.Mail; +import reactor.core.publisher.Mono; + public class CassandraMailQueueMailStore { private final EnqueuedMailsDAO enqueuedMailsDao; @@ -50,13 +51,13 @@ public class CassandraMailQueueMailStore { this.clock = clock; } - CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) { + Mono<Void> storeMail(EnqueuedItem enqueuedItem) { EnqueuedItemWithSlicingContext enqueuedItemAndSlicing = addSliceContext(enqueuedItem); return enqueuedMailsDao.insert(enqueuedItemAndSlicing); } - CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) { + Mono<Void> initializeBrowseStart(MailQueueName mailQueueName) { return browseStartDao .insertInitialBrowseStart(mailQueueName, currentSliceStartInstant()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index 4980f46..a0ca3cd 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -20,7 +20,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; import javax.inject.Inject; @@ -33,9 +32,10 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement; import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext; import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; -import org.apache.james.util.FluentFutureStream; import org.apache.mailet.Mail; +import reactor.core.publisher.Mono; + public class CassandraMailQueueView implements MailQueueView { public static class Factory implements MailQueueView.Factory { @@ -80,62 +80,54 @@ public class CassandraMailQueueView implements MailQueueView { @Override public void initialize(MailQueueName mailQueueName) { - storeHelper.initializeBrowseStart(mailQueueName) - .join(); + storeHelper.initializeBrowseStart(mailQueueName).block(); } @Override public CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) { - return storeHelper.storeMail(enqueuedItem); + return storeHelper.storeMail(enqueuedItem).toFuture(); } @Override public ManageableMailQueue.MailQueueIterator browse() { return new CassandraMailQueueBrowser.CassandraMailQueueIterator( cassandraMailQueueBrowser.browse(mailQueueName) - .join() + .toIterable() .iterator()); } @Override public long getSize() { - return cassandraMailQueueBrowser.browseReferences(mailQueueName) - .join() - .count(); + return cassandraMailQueueBrowser.browseReferences(mailQueueName).count().block(); } @Override - public CompletableFuture<Long> delete(DeleteCondition deleteCondition) { + public long delete(DeleteCondition deleteCondition) { if (deleteCondition instanceof DeleteCondition.WithName) { DeleteCondition.WithName nameDeleteCondition = (DeleteCondition.WithName) deleteCondition; - - return delete(MailKey.of(nameDeleteCondition.getName())).thenApply(any -> 1L); + return delete(MailKey.of(nameDeleteCondition.getName())).map(any -> 1L).block(); } - return browseThenDelete(deleteCondition); } - private CompletableFuture<Long> browseThenDelete(DeleteCondition deleteCondition) { - CompletableFuture<Long> result = cassandraMailQueueBrowser.browseReferences(mailQueueName) + private long browseThenDelete(DeleteCondition deleteCondition) { + return cassandraMailQueueBrowser.browseReferences(mailQueueName) .map(EnqueuedItemWithSlicingContext::getEnqueuedItem) .filter(mailReference -> deleteCondition.shouldBeDeleted(mailReference.getMail())) - .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName), - FluentFutureStream::unboxFuture) - .completableFuture() - .thenApply(Stream::count); - - result.thenRunAsync(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)); - - return result; + .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName)) + .count() + .doOnTerminate(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)) + .block(); } - private CompletableFuture<Void> delete(MailKey mailKey) { + private Mono<Void> delete(MailKey mailKey) { return cassandraMailQueueMailDelete.considerDeleted(mailKey, mailQueueName); } @Override public CompletableFuture<Boolean> isPresent(Mail mail) { return cassandraMailQueueMailDelete.isDeleted(mail, mailQueueName) - .thenApply(bool -> !bool); + .map(bool -> !bool) + .toFuture(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java index c5feefc..a1986e1 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java @@ -27,8 +27,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.QUEUE_NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.TABLE_NAME; -import java.util.concurrent.CompletableFuture; - import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; @@ -37,6 +35,7 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import reactor.core.publisher.Mono; public class DeletedMailsDAO { @@ -64,20 +63,21 @@ public class DeletedMailsDAO { .and(eq(MAIL_KEY, bindMarker(MAIL_KEY)))); } - CompletableFuture<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) { - return executor.executeVoid(insertOne.bind() + Mono<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) { + return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind() .setString(QUEUE_NAME, mailQueueName.asString()) - .setString(MAIL_KEY, mailKey.getMailKey())); + .setString(MAIL_KEY, mailKey.getMailKey()))); } - CompletableFuture<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) { - return executor.executeReturnExists( + Mono<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) { + return Mono.fromCompletionStage(executor.executeReturnExists( selectOne.bind() .setString(QUEUE_NAME, mailQueueName.asString()) - .setString(MAIL_KEY, mailKey.getMailKey())); + .setString(MAIL_KEY, mailKey.getMailKey()))); } - CompletableFuture<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) { - return isDeleted(mailQueueName, mailKey).thenApply(b -> !b); + Mono<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) { + return isDeleted(mailQueueName, mailKey) + .map(b -> !b); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java index fef9ba1..eb1e9cf 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java @@ -47,8 +47,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlice import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; import java.util.Date; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; import javax.inject.Inject; @@ -64,6 +62,10 @@ import org.apache.mailet.Mail; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class EnqueuedMailsDAO { @@ -73,10 +75,12 @@ public class EnqueuedMailsDAO { private final CassandraUtils cassandraUtils; private final CassandraTypesProvider cassandraTypesProvider; private final BlobId.Factory blobFactory; + private Scheduler scheduler; @Inject EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider, BlobId.Factory blobIdFactory) { + this.scheduler = Schedulers.parallel(); this.executor = new CassandraAsyncExecutor(session); this.cassandraUtils = cassandraUtils; this.cassandraTypesProvider = cassandraTypesProvider; @@ -114,13 +118,13 @@ public class EnqueuedMailsDAO { .value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS))); } - CompletableFuture<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) { + Mono<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) { EnqueuedItem enqueuedItem = enqueuedItemWithSlicing.getEnqueuedItem(); EnqueuedItemWithSlicingContext.SlicingContext slicingContext = enqueuedItemWithSlicing.getSlicingContext(); Mail mail = enqueuedItem.getMail(); MimeMessagePartsId mimeMessagePartsId = enqueuedItem.getPartsId(); - return executor.executeVoid(insert.bind() + return Mono.fromCompletionStage(executor.executeVoid(insert.bind() .setString(QUEUE_NAME, enqueuedItem.getMailQueueName().asString()) .setTimestamp(TIME_RANGE_START, Date.from(slicingContext.getTimeRangeStart())) .setInt(BUCKET_ID, slicingContext.getBucketId().getValue()) @@ -136,19 +140,20 @@ public class EnqueuedMailsDAO { .setString(REMOTE_HOST, mail.getRemoteHost()) .setTimestamp(LAST_UPDATED, mail.getLastUpdated()) .setMap(ATTRIBUTES, toRawAttributeMap(mail)) - .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))); + .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders())))); } - CompletableFuture<Stream<EnqueuedItemWithSlicingContext>> selectEnqueuedMails( + Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails( MailQueueName queueName, Slice slice, BucketId bucketId) { - return executor.execute( + return Mono.fromCompletionStage(executor.execute( selectFrom.bind() .setString(QUEUE_NAME, queueName.asString()) .setTimestamp(TIME_RANGE_START, Date.from(slice.getStartSliceInstant())) - .setInt(BUCKET_ID, bucketId.getValue())) - .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) - .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory))); + .setInt(BUCKET_ID, bucketId.getValue()))) + .map(cassandraUtils::convertToStream) + .flatMapMany(Flux::fromStream) + .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory)); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java index d35b6d7..2f60736 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java @@ -69,17 +69,6 @@ public class BucketedSlices { return new Slice(sliceStartInstant); } - public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant endAt, Duration windowSize) { - long sliceCount = calculateSliceCount(firstSlice, endAt, windowSize); - long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond(); - long sliceWindowSizeInSecond = windowSize.getSeconds(); - - return LongStream.range(0, sliceCount) - .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition) - .mapToObj(Instant::ofEpochSecond) - .map(Slice::of); - } - private static long calculateSliceCount(Slice firstSlice, Instant endAt, Duration windowSize) { long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond(); long endAtSeconds = endAt.getEpochSecond(); @@ -104,6 +93,17 @@ public class BucketedSlices { return startSliceInstant; } + public Stream<Slice> allSlicesTill(Instant endAt, Duration windowSize) { + long sliceCount = calculateSliceCount(this, endAt, windowSize); + long startAtSeconds = this.getStartSliceInstant().getEpochSecond(); + long sliceWindowSizeInSecond = windowSize.getSeconds(); + + return LongStream.range(0, sliceCount) + .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition) + .mapToObj(Instant::ofEpochSecond) + .map(Slice::of); + } + @Override public final boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java index ef844c6..c427d44 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java @@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import static org.assertj.core.api.Assertions.assertThat; import java.time.Instant; -import java.util.Optional; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; @@ -31,6 +30,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Mono; + class BrowseStartDAOTest { private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1"); @@ -50,37 +51,37 @@ class BrowseStartDAOTest { @Test void findBrowseStartShouldReturnEmptyWhenTableDoesntContainQueueName() { - testee.updateBrowseStart(OUT_GOING_1, NOW).join(); + testee.updateBrowseStart(OUT_GOING_1, NOW).block(); - Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join(); - assertThat(firstEnqueuedItemFromQueue2) + Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2); + assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block()) .isEmpty(); } @Test void findBrowseStartShouldReturnInstantWhenTableContainsQueueName() { - testee.updateBrowseStart(OUT_GOING_1, NOW).join(); - testee.updateBrowseStart(OUT_GOING_2, NOW).join(); + testee.updateBrowseStart(OUT_GOING_1, NOW).block(); + testee.updateBrowseStart(OUT_GOING_2, NOW).block(); - Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join(); - assertThat(firstEnqueuedItemFromQueue2) + Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2); + assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block()) .isNotEmpty(); } @Test void updateFirstEnqueuedTimeShouldWork() { - testee.updateBrowseStart(OUT_GOING_1, NOW).join(); + testee.updateBrowseStart(OUT_GOING_1, NOW).block(); - assertThat(testee.selectOne(OUT_GOING_1).join()) + assertThat(testee.selectOne(OUT_GOING_1).flux().collectList().block()) .isNotEmpty(); } @Test void insertInitialBrowseStartShouldInsertFirstInstant() { - testee.insertInitialBrowseStart(OUT_GOING_1, NOW).join(); - testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).join(); + testee.insertInitialBrowseStart(OUT_GOING_1, NOW).block(); + testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).block(); - assertThat(testee.findBrowseStart(OUT_GOING_1).join()) + assertThat(testee.findBrowseStart(OUT_GOING_1).flux().collectList().block()) .contains(NOW); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java index 7eadf9c..d9a451a 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java @@ -25,11 +25,11 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore; import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao; import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; -import org.apache.james.blob.api.HashBlobId; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule; @@ -38,6 +38,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.Eventsourcin import com.datastax.driver.core.Session; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + public class CassandraMailQueueViewTestFactory { public static CassandraMailQueueView.Factory factory(Clock clock, ThreadLocalRandom random, Session session, @@ -69,7 +71,9 @@ public class CassandraMailQueueViewTestFactory { public static boolean isInitialized(Session session, MailQueueName mailQueueName) { BrowseStartDAO browseStartDao = new BrowseStartDAO(session); return browseStartDao.findBrowseStart(mailQueueName) - .thenApply(Optional::isPresent) - .join(); + .map(Optional::ofNullable) + .switchIfEmpty(Mono.just(Optional.empty())) + .block() + .isPresent(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java index d9dc69a..e21804f 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java @@ -50,58 +50,58 @@ class DeletedMailsDAOTest { void markAsDeletedShouldWork() { Boolean isDeletedBeforeMark = testee .isDeleted(OUT_GOING_1, MAIL_KEY_1) - .join(); + .block(); assertThat(isDeletedBeforeMark).isFalse(); - testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join(); + testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block(); Boolean isDeletedAfterMark = testee .isDeleted(OUT_GOING_1, MAIL_KEY_1) - .join(); + .block(); assertThat(isDeletedAfterMark).isTrue(); } @Test void checkDeletedShouldReturnFalseWhenTableDoesntContainBothMailQueueAndMailKey() { - testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).join(); + testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).block(); Boolean isDeleted = testee .isDeleted(OUT_GOING_1, MAIL_KEY_1) - .join(); + .block(); assertThat(isDeleted).isFalse(); } @Test void checkDeletedShouldReturnFalseWhenTableContainsMailQueueButNotMailKey() { - testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).join(); + testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).block(); Boolean isDeleted = testee .isDeleted(OUT_GOING_1, MAIL_KEY_1) - .join(); + .block(); assertThat(isDeleted).isFalse(); } @Test void checkDeletedShouldReturnFalseWhenTableContainsMailKeyButNotMailQueue() { - testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).join(); + testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).block(); Boolean isDeleted = testee .isDeleted(OUT_GOING_1, MAIL_KEY_1) - .join(); + .block(); assertThat(isDeleted).isFalse(); } @Test void checkDeletedShouldReturnTrueWhenTableContainsMailItem() { - testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join(); + testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block(); Boolean isDeleted = testee .isDeleted(OUT_GOING_1, MAIL_KEY_1) - .join(); + .block(); assertThat(isDeleted).isTrue(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java index 3d44db1..13b9e00 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java @@ -25,7 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.SoftAssertions.assertSoftly; import java.time.Instant; -import java.util.stream.Stream; +import java.util.List; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; @@ -86,11 +86,11 @@ class EnqueuedMailsDaoTest { .build()) .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW)) .build()) - .join(); + .block(); - Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee + List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee .selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID) - .join(); + .collectList().block(); assertThat(selectedEnqueuedMails).hasSize(1); } @@ -108,7 +108,7 @@ class EnqueuedMailsDaoTest { .build()) .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW)) .build()) - .join(); + .block(); testee.insert(EnqueuedItemWithSlicingContext.builder() .enqueuedItem(EnqueuedItem.builder() @@ -121,10 +121,10 @@ class EnqueuedMailsDaoTest { .build()) .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE + 1), NOW)) .build()) - .join(); + .block(); - Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID) - .join(); + List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID) + .collectList().block(); assertThat(selectedEnqueuedMails) .hasSize(1) http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java index 842216c..1fb6916 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java @@ -64,13 +64,13 @@ class BucketedSlicesTest { @Test void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() { - assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW)) + assertThat(FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW)) .containsOnly(FIRST_SLICE); } @Test void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() { - Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW); + Stream<Slice> allSlices = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW); assertThat(allSlices) .containsExactly( @@ -81,9 +81,9 @@ class BucketedSlicesTest { @Test void allSlicesTillShouldReturnSameSlicesWhenEndAtsAreInTheSameInterval() { - Stream<Slice> allSlicesEndAtTheStartOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW); - Stream<Slice> allSlicesEndAtTheMiddleOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW); - Stream<Slice> allSlicesEndAtTheEndWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW); + Stream<Slice> allSlicesEndAtTheStartOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW); + Stream<Slice> allSlicesEndAtTheMiddleOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW); + Stream<Slice> allSlicesEndAtTheEndWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW); Slice [] allSlicesInThreeHours = { FIRST_SLICE, @@ -102,7 +102,7 @@ class BucketedSlicesTest { @Test void allSlicesTillShouldReturnEmptyIfEndAtBeforeStartSlice() { - Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE_NEXT_TWO_HOUR, FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW); + Stream<Slice> allSlices = FIRST_SLICE_NEXT_TWO_HOUR.allSlicesTill(FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW); assertThat(allSlices).isEmpty(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
