This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e694f392989540a416765c224bee028327651eba Author: Benoit Tellier <[email protected]> AuthorDate: Mon May 4 10:36:06 2020 +0700 [Refactoring] Extract a ReactorUtils::unboxOptional utils This reduces the boiler plate around the use of handle, which optimizes Reactor synchronous transformations. --- .../james/backends/cassandra/utils/CassandraAsyncExecutor.java | 4 +++- .../james/mailbox/cassandra/mail/CassandraMessageMapper.java | 4 +++- .../james/mailbox/cassandra/mail/CassandraModSeqProvider.java | 8 +++++--- .../apache/james/mailbox/cassandra/mail/CassandraUidProvider.java | 7 +++++-- .../cassandra/mail/task/SolveMessageInconsistenciesService.java | 6 ++++-- .../james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java | 7 +++++-- .../mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java | 7 +++++-- .../mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java | 7 +++++-- .../events/ElasticSearchListeningMessageSearchIndex.java | 3 ++- .../main/java/org/apache/james/vault/metadata/MetadataDAO.java | 4 +++- .../util/src/main/java/org/apache/james/util/ReactorUtils.java | 5 +++++ .../james/mailrepository/cassandra/CassandraMailRepository.java | 4 +++- .../org/apache/james/jmap/draft/methods/GetMailboxesMethod.java | 8 +++++--- .../configuration/EventsourcingConfigurationManagement.java | 7 +++++-- .../eventsourcing/distributed/RabbitMQTerminationSubscriber.java | 5 ++++- .../main/java/org/apache/james/task/SerialTaskManagerWorker.java | 4 +++- 16 files changed, 65 insertions(+), 25 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java index 86b192b..24bf48e 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java @@ -19,6 +19,8 @@ package org.apache.james.backends.cassandra.utils; +import static org.apache.james.util.ReactorUtils.publishIfPresent; + import java.util.Optional; import javax.inject.Inject; @@ -66,7 +68,7 @@ public class CassandraAsyncExecutor { public Mono<Row> executeSingleRow(Statement statement) { return executeSingleRowOptional(statement) - .handle((t, sink) -> t.ifPresent(sink::next)); + .handle(publishIfPresent()); } public Flux<Row> executeRows(Statement statement) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 465c9cf..cb31434 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.cassandra.mail; +import static org.apache.james.util.ReactorUtils.publishIfPresent; + import java.time.Duration; import java.util.Comparator; import java.util.Iterator; @@ -312,7 +314,7 @@ public class CassandraMessageMapper implements MessageMapper { if (!failed.isEmpty()) { Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed) .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid)) - .handle((t, sink) -> t.ifPresent(sink::next)); + .handle(publishIfPresent()); return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator); } else { return Mono.empty(); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index 15f1a2d..fc1647c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update; import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.MAILBOX_ID; import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ; import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME; +import static org.apache.james.util.ReactorUtils.publishIfPresent; import java.time.Duration; import java.util.Optional; @@ -157,7 +158,8 @@ public class CassandraModSeqProvider implements ModSeqProvider { insert.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(NEXT_MODSEQ, nextModSeq.asLong())) - .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next)); + .map(success -> successToModSeq(nextModSeq, success)) + .handle(publishIfPresent()); } private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) { @@ -167,7 +169,8 @@ public class CassandraModSeqProvider implements ModSeqProvider { .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(NEXT_MODSEQ, nextModSeq.asLong()) .setLong(MOD_SEQ_CONDITION, modSeq.asLong())) - .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next)); + .map(success -> successToModSeq(nextModSeq, success)) + .handle(publishIfPresent()); } private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) { @@ -187,5 +190,4 @@ public class CassandraModSeqProvider implements ModSeqProvider { .single() .retryWhen(Retry.backoff(maxModSeqRetries, firstBackoff).scheduler(Schedulers.elastic())); } - } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java index fea0d5e..86fa061 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java @@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update; import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.MAILBOX_ID; import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.NEXT_UID; import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME; +import static org.apache.james.util.ReactorUtils.publishIfPresent; import java.time.Duration; import java.util.Optional; @@ -135,14 +136,16 @@ public class CassandraUidProvider implements UidProvider { .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(CONDITION, uid.asLong()) .setLong(NEXT_UID, nextUid.asLong())) - .handle((success, sink) -> successToUid(nextUid, success).ifPresent(sink::next)); + .map(success -> successToUid(nextUid, success)) + .handle(publishIfPresent()); } private Mono<MessageUid> tryInsert(CassandraId mailboxId) { return executor.executeReturnApplied( insertStatement.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid())) - .handle((success, sink) -> successToUid(MessageUid.MIN_VALUE, success).ifPresent(sink::next)); + .map(success -> successToUid(MessageUid.MIN_VALUE, success)) + .handle(publishIfPresent()); } private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java index 8518f1e..f0ec09b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.cassandra.mail.task; +import static org.apache.james.util.ReactorUtils.publishIfPresent; + import java.util.Collection; import java.util.Objects; import java.util.Optional; @@ -414,7 +416,7 @@ public class SolveMessageInconsistenciesService { private Mono<Inconsistency> compareWithMessageIdRecord(ComposedMessageIdWithMetaData upToDateMessageFromImapUid) { return messageIdDAO.retrieve((CassandraId) upToDateMessageFromImapUid.getComposedMessageId().getMailboxId(), upToDateMessageFromImapUid.getComposedMessageId().getUid()) - .flatMap(Mono::justOrEmpty) + .handle(publishIfPresent()) .map(messageIdRecord -> { if (messageIdRecord.equals(upToDateMessageFromImapUid)) { return NO_INCONSISTENCY; @@ -433,7 +435,7 @@ public class SolveMessageInconsistenciesService { private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) { return messageIdDAO.retrieve((CassandraId) message.getComposedMessageId().getMailboxId(), message.getComposedMessageId().getUid()) - .flatMap(Mono::justOrEmpty) + .handle(publishIfPresent()) .flatMap(upToDateMessage -> messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId())) .map(uidRecord -> NO_INCONSISTENCY) .switchIfEmpty(Mono.just(new OrphanMessageIdEntry(message))) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java index 74fd9bf..9ee9159 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java @@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.util.ReactorUtils.publishIfPresent; import javax.inject.Inject; @@ -93,14 +94,16 @@ public class CassandraGlobalMaxQuotaDao { return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind() .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.STORAGE)) .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class))) - .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage))); + .map(QuotaCodec::longToQuotaSize) + .handle(publishIfPresent()); } Mono<QuotaCountLimit> getGlobalMaxMessage() { return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind() .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.MESSAGE)) .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class))) - .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages))); + .map(QuotaCodec::longToQuotaCount) + .handle(publishIfPresent()); } Mono<Void> removeGlobaltMaxStorage() { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java index cb49d3a..38161ef 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java @@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.util.ReactorUtils.publishIfPresent; import javax.inject.Inject; @@ -109,13 +110,15 @@ public class CassandraPerDomainMaxQuotaDao { Mono<QuotaSizeLimit> getMaxStorage(Domain domain) { return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(domain.asString())) .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.STORAGE, Long.class))) - .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage))); + .map(QuotaCodec::longToQuotaSize) + .handle(publishIfPresent()); } Mono<QuotaCountLimit> getMaxMessage(Domain domain) { return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(domain.asString())) .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class))) - .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages))); + .map(QuotaCodec::longToQuotaCount) + .handle(publishIfPresent()); } Mono<Void> removeMaxMessage(Domain domain) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java index cff560d..2b4e21b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java @@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.util.ReactorUtils.publishIfPresent; import javax.inject.Inject; @@ -109,13 +110,15 @@ public class CassandraPerUserMaxQuotaDao { Mono<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) { return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(quotaRoot.getValue())) .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.STORAGE, Long.class))) - .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage))); + .map(QuotaCodec::longToQuotaSize) + .handle(publishIfPresent()); } Mono<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) { return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(quotaRoot.getValue())) .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class))) - .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages))); + .map(QuotaCodec::longToQuotaCount) + .handle(publishIfPresent()); } Mono<Void> removeMaxMessage(QuotaRoot quotaRoot) { diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java index 3d2d099..091db6b 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java @@ -18,6 +18,7 @@ ****************************************************************/ package org.apache.james.mailbox.elasticsearch.events; +import static org.apache.james.util.ReactorUtils.publishIfPresent; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import java.util.Collection; @@ -128,7 +129,7 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe return searcher.search(mailboxIds, searchQuery, Optional.empty()) .doOnNext(this::logIfNoMessageId) .map(SearchResult::getMessageId) - .flatMap(Mono::justOrEmpty) + .handle(publishIfPresent()) .distinct() .take(limit); } diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java index 87553e9..f03fd4e 100644 --- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java @@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.util.ReactorUtils.publishIfPresent; import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.BUCKET_NAME; import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.MESSAGE_ID; import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.OWNER; @@ -109,7 +110,8 @@ public class MetadataDAO { .setString(BUCKET_NAME, bucketName.asString()) .setString(OWNER, username.asString())) .map(row -> row.getString(PAYLOAD)) - .handle((json, sink) -> metadataSerializer.deserialize(json).ifPresent(sink::next)); + .map(metadataSerializer::deserialize) + .handle(publishIfPresent()); } Flux<MessageId> retrieveMessageIds(BucketName bucketName, Username username) { diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index df9c937..a477355 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -24,11 +24,13 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Optional; +import java.util.function.BiConsumer; import java.util.function.Consumer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.core.publisher.SynchronousSink; import reactor.util.context.Context; public class ReactorUtils { @@ -39,6 +41,9 @@ public class ReactorUtils { return Mono.fromRunnable(runnable).then(Mono.empty()); } + public static <T> BiConsumer<Optional<T>, SynchronousSink<T>> publishIfPresent() { + return (element, sink) -> element.ifPresent(sink::next); + } public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) { return new StreamInputStream(byteArrays.toIterable(1).iterator()); diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index cf98198..134d969 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -19,6 +19,8 @@ package org.apache.james.mailrepository.cassandra; +import static org.apache.james.util.ReactorUtils.publishIfPresent; + import java.util.Collection; import java.util.Iterator; @@ -93,7 +95,7 @@ public class CassandraMailRepository implements MailRepository { @Override public Mail retrieve(MailKey key) { return mailDAO.read(url, key) - .<MailDTO>handle((t, sink) -> t.ifPresent(sink::next)) + .handle(publishIfPresent()) .flatMap(this::toMail) .blockOptional() .orElse(null); diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java index f74b243..f5936a9 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java @@ -20,6 +20,7 @@ package org.apache.james.jmap.draft.methods; import static org.apache.james.util.ReactorUtils.context; +import static org.apache.james.util.ReactorUtils.publishIfPresent; import java.util.Comparator; import java.util.List; @@ -144,7 +145,7 @@ public class GetMailboxesMethod implements Method { .usingPreloadedMailboxesMetadata(NO_PRELOADED_METADATA) .build()) .subscribeOn(Schedulers.elastic())) - .handle((element, sink) -> element.ifPresent(sink::next)); + .handle(publishIfPresent()); } private Flux<Mailbox> retrieveAllMailboxes(MailboxSession mailboxSession) { @@ -156,12 +157,13 @@ public class GetMailboxesMethod implements Method { return userMailboxesMono.zipWith(quotaLoaderMono) .flatMapMany( tuple -> Flux.fromIterable(tuple.getT1()) - .flatMap(mailboxMetaData -> Mono.justOrEmpty(mailboxFactory.builder() + .map(mailboxMetaData -> mailboxFactory.builder() .mailboxMetadata(mailboxMetaData) .session(mailboxSession) .usingPreloadedMailboxesMetadata(Optional.of(tuple.getT1())) .quotaLoader(tuple.getT2()) - .build()))); + .build()) + .handle(publishIfPresent())); } private Flux<MailboxMetaData> getAllMailboxesMetaData(MailboxSession mailboxSession) { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java index ba2c621..6cacd12 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java @@ -19,6 +19,8 @@ package org.apache.james.queue.rabbitmq.view.cassandra.configuration; +import static org.apache.james.util.ReactorUtils.publishIfPresent; + import javax.inject.Inject; import org.apache.james.eventsourcing.AggregateId; @@ -53,9 +55,10 @@ public class EventsourcingConfigurationManagement { @VisibleForTesting Mono<CassandraMailQueueViewConfiguration> load() { return Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID)) - .flatMap(history -> Mono.justOrEmpty(ConfigurationAggregate + .map(history -> ConfigurationAggregate .load(CONFIGURATION_AGGREGATE_ID, history) - .getCurrentConfiguration())); + .getCurrentConfiguration()) + .handle(publishIfPresent()); } public void registerConfiguration(CassandraMailQueueViewConfiguration newConfiguration) { diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index 174101a..2c1812f 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -20,6 +20,8 @@ package org.apache.james.task.eventsourcing.distributed; +import static org.apache.james.util.ReactorUtils.publishIfPresent; + import java.io.Closeable; import java.nio.charset.StandardCharsets; import java.util.Optional; @@ -91,7 +93,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta listenQueueHandle = listenerReceiver .consumeAutoAck(queueName) .subscribeOn(Schedulers.elastic()) - .<Event>handle((delivery, sink) -> toEvent(delivery).ifPresent(sink::next)) + .map(this::toEvent) + .handle(publishIfPresent()) .subscribe(listener::onNext); } diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index b024c4c..8341a16 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -18,6 +18,8 @@ ****************************************************************/ package org.apache.james.task; +import static org.apache.james.util.ReactorUtils.publishIfPresent; + import java.io.IOException; import java.time.Duration; import java.util.Optional; @@ -91,7 +93,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { return Mono.fromCallable(() -> taskWithId.getTask().details()) .delayElement(pollingInterval, Schedulers.elastic()) .repeat() - .<TaskExecutionDetails.AdditionalInformation>handle((maybeDetails, sink) -> maybeDetails.ifPresent(sink::next)) + .handle(publishIfPresent()) .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), information)).thenReturn(information)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
