This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bf600b02c74846cc7670bc241a42fe5f65360dae Author: Matthieu Baechler <[email protected]> AuthorDate: Tue Dec 10 14:45:03 2019 +0100 JAMES-3004 avoid starvation issues by use Schedulers.elastic instead of boundedElastic --- .../backends/cassandra/init/CassandraTableManager.java | 2 +- .../cassandra/init/ResilientClusterProvider.java | 4 ++-- .../backends/cassandra/utils/CassandraAsyncExecutor.java | 2 +- .../org/apache/james/backends/es/ClientProvider.java | 4 ++-- .../james/backends/es/ElasticSearchClusterExtension.java | 2 +- .../backends/rabbitmq/RabbitMQConnectionFactory.java | 2 +- .../backends/rabbitmq/ReactorRabbitMQChannelPool.java | 2 +- .../james/backends/rabbitmq/SimpleConnectionPool.java | 2 +- .../james/mailbox/backup/DefaultMailboxBackup.java | 2 +- .../mailbox/cassandra/mail/CassandraMessageDAO.java | 2 +- .../mailbox/cassandra/mail/CassandraMessageIdMapper.java | 4 ++-- .../mailbox/cassandra/mail/CassandraMessageMapper.java | 2 +- .../mailbox/cassandra/mail/CassandraModSeqProvider.java | 2 +- .../james/mailbox/events/delivery/EventDelivery.java | 2 +- .../org/apache/james/mailbox/events/EventDispatcher.java | 2 +- .../apache/james/mailbox/events/GroupRegistration.java | 2 +- .../james/mailbox/events/KeyRegistrationHandler.java | 2 +- .../apache/james/mailbox/events/WaitDelayGenerator.java | 2 +- .../james/mailbox/events/LocalListenerRegistryTest.java | 2 +- .../james/vault/blob/BlobStoreDeletedMessageVault.java | 2 +- .../vault/blob/BlobStoreVaultGarbageCollectionTask.java | 2 +- .../org/apache/james/mailbox/store/PreDeletionHooks.java | 2 +- .../apache/james/mailbox/store/StoreMessageManager.java | 2 +- .../src/main/java/org/apache/james/blob/api/Store.java | 2 +- .../apache/james/blob/cassandra/CassandraBlobStore.java | 6 +++--- .../james/blob/objectstorage/ObjectStorageBlobStore.java | 6 +++--- .../blob/objectstorage/StreamCompatibleBlobPutter.java | 6 +++--- .../james/blob/objectstorage/aws/AwsS3ObjectStorage.java | 4 ++-- .../blob/objectstorage/ObjectStorageBlobStoreTest.java | 8 ++++---- .../java/org/apache/james/StartUpChecksPerformer.java | 2 +- .../org/apache/james/GuiceLifecycleHeathCheckTest.java | 2 +- .../src/main/java/org/apache/james/util/Runnables.java | 4 ++-- .../java/org/apache/james/util/ReactorUtilsTest.java | 8 ++++---- .../jmap/api/projections/MessageFastViewProjection.java | 2 +- .../james/transport/mailets/delivery/MailDispatcher.java | 2 +- .../draft/model/message/view/MessageFastViewFactory.java | 2 +- .../draft/model/message/view/MessageFullViewFactory.java | 4 ++-- .../event/ComputeMessageFastViewProjectionListener.java | 2 +- .../apache/james/queue/api/DelayedMailQueueContract.java | 6 +++--- .../queue/api/DelayedPriorityMailQueueContract.java | 4 ++-- .../org/apache/james/queue/api/MailQueueContract.java | 16 ++++++++-------- .../james/queue/api/PriorityMailQueueContract.java | 8 ++++---- .../james/queue/memory/MemoryMailQueueFactory.java | 2 +- .../view/cassandra/CassandraMailQueueMailDelete.java | 2 +- .../distributed/RabbitMQTerminationSubscriber.java | 4 ++-- .../eventsourcing/distributed/RabbitMQWorkQueue.java | 6 +++--- .../eventsourcing/distributed/RabbitMQWorkQueueTest.java | 2 +- .../java/org/apache/james/task/MemoryTaskManager.java | 2 +- .../main/java/org/apache/james/task/MemoryWorkQueue.java | 2 +- .../org/apache/james/task/SerialTaskManagerWorker.java | 2 +- .../task/eventsourcing/EventSourcingTaskManager.scala | 2 +- .../eventsourcing/TerminationSubscriberContract.java | 6 +++--- 52 files changed, 88 insertions(+), 88 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java index 0f16dc2..30980f3 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java @@ -59,7 +59,7 @@ public class CassandraTableManager { public void clearAllTables() { CassandraAsyncExecutor executor = new CassandraAsyncExecutor(session); Flux.fromIterable(module.moduleTables()) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .map(CassandraTable::getName) .flatMap(name -> truncate(executor, name)) .then() diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java index 657f118..01040bb 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java @@ -50,8 +50,8 @@ public class ResilientClusterProvider implements Provider<Cluster> { Duration forever = Duration.ofMillis(Long.MAX_VALUE); cluster = Mono.fromCallable(getClusterRetryCallable(configuration)) .doOnError(e -> LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", waitDelay, e)) - .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.boundedElastic()) - .publishOn(Schedulers.boundedElastic()) + .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.elastic()) + .publishOn(Schedulers.elastic()) .block(); } diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java index 50508cf..1b6464c 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java @@ -44,7 +44,7 @@ public class CassandraAsyncExecutor { public Mono<ResultSet> execute(Statement statement) { return Mono.defer(() -> Mono.fromFuture(FutureConverter .toCompletableFuture(session.executeAsync(statement))) - .publishOn(Schedulers.boundedElastic())); + .publishOn(Schedulers.elastic())); } public Mono<Boolean> executeReturnApplied(Statement statement) { diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java index 1ff14cd..b879a88 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java @@ -184,8 +184,8 @@ public class ClientProvider implements Provider<RestHighLevelClient> { return Mono.fromCallable(() -> connectToCluster(configuration)) .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}", DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e)) - .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.boundedElastic()) - .publishOn(Schedulers.boundedElastic()) + .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.elastic()) + .publishOn(Schedulers.elastic()) .block(); } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchClusterExtension.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchClusterExtension.java index 41660b3..d0d3a5d 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchClusterExtension.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchClusterExtension.java @@ -75,7 +75,7 @@ class ElasticSearchClusterExtension implements AfterAllCallback, BeforeAllCallba Flux.fromStream(Stream.of(runnables) .map(Mono::fromRunnable)) .parallel(runnables.length) - .runOn(Schedulers.boundedElastic()) + .runOn(Schedulers.elastic()) .flatMap(Function.identity()) .then() .block(); diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java index ccbc28b..1f4d0fd 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java @@ -60,6 +60,6 @@ public class RabbitMQConnectionFactory { Mono<Connection> connectionMono() { Duration forever = Duration.ofMillis(Long.MAX_VALUE); return Mono.fromCallable(connectionFactory::newConnection) - .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.boundedElastic()); + .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.elastic()); } } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index a5dbea6..5ccfbe1 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -79,7 +79,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { return Mono.fromCallable(connection::openChannel) .map(maybeChannel -> maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels"))) - .retryBackoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF, FOREVER, Schedulers.boundedElastic()) + .retryBackoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF, FOREVER, Schedulers.elastic()) .doOnError(throwable -> LOGGER.error("error when creating new channel", throwable)); } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java index d2e08e4..4ffebfd 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java @@ -57,7 +57,7 @@ public class SimpleConnectionPool implements AutoCloseable { Duration initialDelay = Duration.ofMillis(100); Duration forever = Duration.ofMillis(Long.MAX_VALUE); return Mono.defer(this::getOpenConnection) - .retryBackoff(numRetries, initialDelay, forever, Schedulers.boundedElastic()); + .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic()); } private Mono<Connection> getOpenConnection() { diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java index ab708f6..856c094 100644 --- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java +++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java @@ -115,7 +115,7 @@ public class DefaultMailboxBackup implements MailboxBackup { } return Mono.fromRunnable(Throwing.runnable(() -> archiveRestorer.restore(username, source)).sneakyThrow()) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .doOnError(e -> LOGGER.error("Error during account restoration for user : " + username.asString(), e)) .doOnTerminate(Throwing.runnable(source::close).sneakyThrow()) .thenReturn(BackupStatus.DONE) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index b29e7e8..ac232a4 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -236,7 +236,7 @@ public class CassandraMessageDAO { public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .flatMap(id -> retrieveRow(id, fetchType) .flatMap(resultSet -> message(resultSet, id, fetchType)), configuration.getMessageReadChunkSize()); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index f317f6b..1b15350 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -88,7 +88,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { @Override public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) { return Flux.fromStream(messageIds.stream()) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize()) .collectList() .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) @@ -174,7 +174,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { public void delete(Multimap<MessageId, MailboxId> ids) { Flux.fromIterable(ids.asMap() .entrySet()) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize()) .then() .block(); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index c43afe7..ebc29a7 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -139,7 +139,7 @@ public class CassandraMessageMapper implements MessageMapper { @Override public List<MailboxCounters> getMailboxCounters(Collection<Mailbox> mailboxes) { return Flux.fromIterable(mailboxes) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .concatMap(this::getMailboxCountersAsMono) .toStream() .collect(Guavate.toImmutableList()); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index 7aeb45e..e5d7290 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -188,7 +188,7 @@ public class CassandraModSeqProvider implements ModSeqProvider { Duration forever = Duration.ofMillis(Long.MAX_VALUE); return tryFindThenUpdateOnce(mailboxId) .single() - .retryBackoff(maxModSeqRetries, Duration.ofMillis(2), forever, Schedulers.boundedElastic()); + .retryBackoff(maxModSeqRetries, Duration.ofMillis(2), forever, Schedulers.elastic()); } private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) { diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java index 0f56a60..7b2fb67 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java @@ -89,7 +89,7 @@ public interface EventDelivery { @Override public Mono<Void> doRetry(Mono<Void> executionResult, Event event) { return executionResult - .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.boundedElastic()) + .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}", mailboxListener.getClass().getCanonicalName(), retryBackoff.getMaxRetries(), diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java index 47844d7..85cc48c 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java @@ -80,7 +80,7 @@ class EventDispatcher { .concat( dispatchToLocalListeners(event, keys), dispatchToRemoteListeners(serializeEvent(event), keys)) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .doOnError(throwable -> LOGGER.error("error while dispatching event", throwable)) .then() .subscribeWith(MonoProcessor.create()); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index dac4ae3..5622412 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -138,7 +138,7 @@ class GroupRegistration implements Registration { int currentRetryCount = getRetryCount(acknowledgableDelivery); return delayGenerator.delayIfHaveTo(currentRetryCount) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event)))) .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)) .then(Mono.fromRunnable(acknowledgableDelivery::ack)); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java index e0f9001..41db3f8 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java @@ -128,7 +128,7 @@ class KeyRegistrationHandler { .log(logger -> logger.error("Exception happens when handling event", e))) .onErrorResume(e -> Mono.empty()) .then()) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .then(); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java index 922ae02..33a3586 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java @@ -59,7 +59,7 @@ class WaitDelayGenerator { } return countRetryMono - .delayElement(generateDelay(retryCount), Schedulers.boundedElastic()); + .delayElement(generateDelay(retryCount), Schedulers.elastic()); } @VisibleForTesting diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java index 9bd70ec..8136239 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java @@ -247,7 +247,7 @@ class LocalListenerRegistryTest { LocalListenerRegistry.LocalRegistration registration5 = testee.addListener(KEY_1, listener5); Mono<List<MailboxListener>> listeners = testee.getLocalMailboxListeners(KEY_1) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .delayElements(Duration.ofMillis(100)) .collectList(); diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java index 71115f8..7cc3905 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java @@ -154,7 +154,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { .flatMap(storageInformation -> Mono.from(messageMetadataVault.remove(storageInformation.getBucketName(), username, messageId)) .thenReturn(storageInformation)) .flatMap(storageInformation -> blobStore.delete(storageInformation.getBucketName(), storageInformation.getBlobId())) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.elastic()); } @Override diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java index f82fc6a..61b4e5d 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java @@ -97,7 +97,7 @@ public class BlobStoreVaultGarbageCollectionTask implements Task { public Result run() { retentionOperation .doOnNext(deletedBuckets::add) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .then() .block(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java index 0429289..b868d62 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java @@ -50,7 +50,7 @@ public class PreDeletionHooks { public Mono<Void> runHooks(PreDeletionHook.DeleteOperation deleteOperation) { return Flux.fromIterable(hooks) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .flatMap(hook -> metricFactory.runPublishingTimerMetric(PRE_DELETION_HOOK_METRIC_NAME, Mono.from(hook.notifyDelete(deleteOperation))), CONCURRENCY) .then(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index d985641..259cd7b 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -732,7 +732,7 @@ public class StoreMessageManager implements MessageManager { MessageMapper messageMapper = mapperFactory.getMessageMapper(session); DeleteOperation deleteOperation = Flux.fromIterable(MessageRange.toRanges(uids)) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .flatMap(range -> Mono.fromCallable(() -> messageMapper.findInMailbox(mailbox, range, FetchType.Metadata, UNLIMITED)) .flatMapMany(iterator -> Flux.fromStream(Iterators.toStream(iterator)))) .map(mailboxMessage -> MetadataWithMailboxId.from(mailboxMessage.metaData(), mailboxMessage.getMailboxId())) diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java index 41910d5..5feef59 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java @@ -118,7 +118,7 @@ public interface Store<T, I> { @Override public Mono<T> read(I blobIds) { return Flux.fromIterable(blobIds.asMap().entrySet()) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .flatMapSequential( entry -> blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue()) .zipWith(Mono.just(entry.getKey()))) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index f725304..7f7efe3 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -92,7 +92,7 @@ public class CassandraBlobStore implements BlobStore { private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] data, BlobId blobId) { Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize()); return Flux.fromStream(chunks) - .publishOn(Schedulers.boundedElastic(), PREFETCH) + .publishOn(Schedulers.elastic(), PREFETCH) .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue()) .then(Mono.just(getChunkNum(pair)))) .collect(Collectors.maxBy(Comparator.comparingInt(x -> x))) @@ -129,13 +129,13 @@ public class CassandraBlobStore implements BlobStore { private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) { Integer rowCount = selectRowCount(bucketName, blobId) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .single() .onErrorResume(NoSuchElementException.class, e -> Mono.error( new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)))) .block(); return Flux.range(0, rowCount) - .publishOn(Schedulers.boundedElastic(), PREFETCH) + .publishOn(Schedulers.elastic(), PREFETCH) .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex) .single() .onErrorResume(NoSuchElementException.class, e -> Mono.error( diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java index 014b354..8a0628e 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java @@ -186,7 +186,7 @@ public class ObjectStorageBlobStore implements BlobStore { public Mono<Void> deleteBucket(BucketName bucketName) { ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); return Mono.<Void>fromRunnable(() -> blobStore.deleteContainer(resolvedBucketName.asString())) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.elastic()); } public PayloadCodec getPayloadCodec() { @@ -196,7 +196,7 @@ public class ObjectStorageBlobStore implements BlobStore { @VisibleForTesting Mono<Void> deleteAllBuckets() { return Flux.fromIterable(blobStore.list()) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .filter(storageMetadata -> storageMetadata.getType().equals(StorageType.CONTAINER)) .map(StorageMetadata::getName) .doOnNext(blobStore::deleteContainer) @@ -207,6 +207,6 @@ public class ObjectStorageBlobStore implements BlobStore { public Mono<Void> delete(BucketName bucketName, BlobId blobId) { ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); return Mono.<Void>fromRunnable(() -> blobStore.removeBlob(resolvedBucketName.asString(), blobId.asString())) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.elastic()); } } diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java index aa9e294..30ad89c 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java @@ -52,14 +52,14 @@ public class StreamCompatibleBlobPutter implements BlobPutter { @Override public void putDirectly(ObjectStorageBucketName bucketName, Blob blob) { Mono.fromRunnable(() -> blobStore.putBlob(bucketName.asString(), blob)) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .retryWhen(Retry.onlyIf(retryContext -> needToCreateBucket(retryContext.exception(), bucketName)) .exponentialBackoff(FIRST_BACK_OFF, FOREVER) - .withBackoffScheduler(Schedulers.boundedElastic()) + .withBackoffScheduler(Schedulers.elastic()) .retryMax(MAX_RETRIES) .doOnRetry(retryContext -> blobStore.createContainerInLocation(DEFAULT_LOCATION, bucketName.asString()))) .retryWhen(Retry.onlyIf(RetryContext -> isPutMethod(RetryContext.exception())) - .withBackoffScheduler(Schedulers.boundedElastic()) + .withBackoffScheduler(Schedulers.elastic()) .exponentialBackoff(FIRST_BACK_OFF, FOREVER) .retryMax(RETRY_ONE_LAST_TIME_ON_CONCURRENT_SAVING)) .block(); diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java index 1ba0f67..db7cd67 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java @@ -177,11 +177,11 @@ public class AwsS3ObjectStorage { private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) { return Mono.<Void>fromRunnable(Throwing.runnable(() -> put(bucketName, configuration, blob, file)).sneakyThrow()) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .retryWhen(Retry .<Void>onlyIf(retryContext -> needToCreateBucket(retryContext.exception())) .exponentialBackoff(FIRST_BACK_OFF, FOREVER) - .withBackoffScheduler(Schedulers.boundedElastic()) + .withBackoffScheduler(Schedulers.elastic()) .retryMax(MAX_RETRY_ON_EXCEPTION) .doOnRetry(retryContext -> createBucket(bucketName, configuration))); } diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java index 9863601..5b05311 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java @@ -178,7 +178,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { // String need to be big enough to get async thread busy hence could not return result instantly Mono<BlobId> blobIdFuture = testee .save(testee.getDefaultBucketName(), BIG_STRING.getBytes(StandardCharsets.UTF_8)) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.elastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @@ -186,7 +186,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { void saveStringShouldNotCompleteWhenDoesNotAwait() { Mono<BlobId> blobIdFuture = testee .save(testee.getDefaultBucketName(), BIG_STRING) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.elastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @@ -194,14 +194,14 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { void saveInputStreamShouldNotCompleteWhenDoesNotAwait() { Mono<BlobId> blobIdFuture = testee .save(testee.getDefaultBucketName(), new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8))) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.elastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @Test void readBytesShouldNotCompleteWhenDoesNotAwait() { BlobId blobId = testee().save(testee.getDefaultBucketName(), BIG_STRING).block(); - Mono<byte[]> resultFuture = testee.readBytes(testee.getDefaultBucketName(), blobId).subscribeOn(Schedulers.boundedElastic()); + Mono<byte[]> resultFuture = testee.readBytes(testee.getDefaultBucketName(), blobId).subscribeOn(Schedulers.elastic()); assertThat(resultFuture.toFuture()).isNotCompleted(); } } diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java b/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java index e2d0f90..b04f2c3 100644 --- a/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java @@ -84,7 +84,7 @@ public class StartUpChecksPerformer { public List<StartUpCheck.CheckResult> check() { return Flux.fromIterable(startUpChecks) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .map(this::checkQuietly) .collect(Guavate.toImmutableList()) .block(); diff --git a/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java b/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java index d2af157..40d2e0f 100644 --- a/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java +++ b/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java @@ -122,7 +122,7 @@ class GuiceLifecycleHeathCheckTest { stopMono = Mono.fromRunnable(server::stop); stopMono - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .subscribeWith(MonoProcessor.create()); when() diff --git a/server/container/util/src/main/java/org/apache/james/util/Runnables.java b/server/container/util/src/main/java/org/apache/james/util/Runnables.java index 7ed8850..b0f707e 100644 --- a/server/container/util/src/main/java/org/apache/james/util/Runnables.java +++ b/server/container/util/src/main/java/org/apache/james/util/Runnables.java @@ -31,9 +31,9 @@ public class Runnables { public static void runParallel(Flux<Runnable> runnables) { runnables - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .parallel() - .runOn(Schedulers.boundedElastic()) + .runOn(Schedulers.elastic()) .flatMap(runnable -> { runnable.run(); return Mono.empty(); diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index e900e05..7bdc678 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -85,7 +85,7 @@ class ReactorUtilsTest { void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.range(0, 10) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)) .map(index -> new byte[] {(byte) (int) index}) @@ -104,7 +104,7 @@ class ReactorUtilsTest { void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); @@ -122,7 +122,7 @@ class ReactorUtilsTest { void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11}) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); @@ -140,7 +140,7 @@ class ReactorUtilsTest { void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.<byte[]>empty() - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java index c9c2e49..6589318 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java @@ -51,6 +51,6 @@ public interface MessageFastViewProjection { .flatMap(messageId -> Mono.from(this.retrieve(messageId)) .map(preview -> Pair.of(messageId, preview))) .collectMap(Pair::getLeft, Pair::getRight) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.elastic()); } } diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java index 4737012..1b3c1c7 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java @@ -94,7 +94,7 @@ public class MailDispatcher { this.mailStore = mailStore; this.consume = consume; this.mailetContext = mailetContext; - this.scheduler = Schedulers.boundedElastic(); + this.scheduler = Schedulers.elastic(); } public void dispatch(Mail mail) throws MessagingException { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java index 6fd35d8..7d7334b 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java @@ -123,7 +123,7 @@ public class MessageFastViewFactory implements MessageViewFactory<MessageFastVie return Mono.from(fastViewProjection.retrieve(messageIds)) .flatMapMany(fastProjections -> gatherMessageViews(messageIdSet, mailboxSession, fastProjections)) .collectList() - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .block(); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java index c806214..364d1be 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java @@ -131,7 +131,7 @@ public class MessageFullViewFactory implements MessageViewFactory<MessageFullVie return Mono.from(fastViewProjection.retrieve(messageId)) .onErrorResume(throwable -> fallBackToCompute(messageContent, hasAttachments, throwable)) .switchIfEmpty(computeThenStoreAsync(messageContent, messageId, hasAttachments)) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .block(); } @@ -148,7 +148,7 @@ public class MessageFullViewFactory implements MessageViewFactory<MessageFullVie return computeProjection(messageContent, hasAttachments) .doOnNext(projection -> Mono.from(fastViewProjection.store(messageId, projection)) .doOnError(throwable -> LOGGER.error("Cannot store the projection to MessageFastViewProjection", throwable)) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .subscribe()); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java index 877f05b..87ce35f 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java @@ -87,7 +87,7 @@ public class ComputeMessageFastViewProjectionListener implements MailboxListener .flatMap(Throwing.function(messageResult -> Mono.fromCallable( () -> Pair.of(messageResult.getMessageId(), computeFastViewPrecomputedProperties(messageResult))) .subscribeOn(Schedulers.parallel()))) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.elastic()) .flatMap(message -> messageFastViewProjection.store(message.getKey(), message.getValue())) .then() .block(); diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java index 588b9e7..e0b51a0 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java @@ -47,7 +47,7 @@ public interface DelayedMailQueueContract { 5L, TimeUnit.SECONDS); - Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next(); + Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next(); assertThatThrownBy(() -> next.block(Duration.ofSeconds(1))) .isInstanceOf(RuntimeException.class); } @@ -72,7 +72,7 @@ public interface DelayedMailQueueContract { 365 * 10, TimeUnit.DAYS); - Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next(); + Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next(); assertThatThrownBy(() -> next.block(Duration.ofSeconds(1))) .isInstanceOf(RuntimeException.class); } @@ -84,7 +84,7 @@ public interface DelayedMailQueueContract { .build(), ChronoUnit.FOREVER.getDuration()); - Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next(); + Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next(); assertThatThrownBy(() -> next.block(Duration.ofSeconds(1))) .isInstanceOf(RuntimeException.class); } diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java index def35bd..03d473e 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java @@ -53,7 +53,7 @@ public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContra Thread.sleep(unit.toMillis(2 * delay)); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem item1 = mailQueueItems.next(); item1.done(true); MailQueue.MailQueueItem item2 = mailQueueItems.next(); @@ -79,7 +79,7 @@ public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContra delay, unit); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem item1 = mailQueueItems.next(); item1.done(true); MailQueue.MailQueueItem item2 = mailQueueItems.next(); diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java index cec9783..1b21737 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java @@ -332,7 +332,7 @@ public interface MailQueueContract { .name(secondExpectedName) .build()); - Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = items.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = items.next(); @@ -351,7 +351,7 @@ public interface MailQueueContract { .build()); Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()) - .subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + .subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = items.next(); MailQueue.MailQueueItem mailQueueItem2 = items.next(); mailQueueItem1.done(true); @@ -370,7 +370,7 @@ public interface MailQueueContract { .name("name2") .build()); - Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = items.next(); MailQueue.MailQueueItem mailQueueItem2 = items.next(); mailQueueItem2.done(true); @@ -385,7 +385,7 @@ public interface MailQueueContract { .name("name1") .build()); - Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = items.next(); mailQueueItem1.done(false); MailQueue.MailQueueItem mailQueueItem2 = items.next(); @@ -401,7 +401,7 @@ public interface MailQueueContract { .build()); LinkedBlockingQueue<MailQueue.MailQueueItem> queue = new LinkedBlockingQueue<>(1); - Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).subscribe(Throwing.consumer(queue::put)); + Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).subscribe(Throwing.consumer(queue::put)); queue.take(); assertThat(queue.poll(2, TimeUnit.SECONDS)).isNull(); @@ -409,7 +409,7 @@ public interface MailQueueContract { @Test default void deQueueShouldBlockWhenNoMail() { - Mono<MailQueue.MailQueueItem> item = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next(); + Mono<MailQueue.MailQueueItem> item = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next(); assertThatThrownBy(() -> item.block(Duration.ofSeconds(2))) .isInstanceOf(RuntimeException.class); @@ -440,7 +440,7 @@ public interface MailQueueContract { LinkedBlockingQueue<MailQueue.MailQueueItem> itemQueue = new LinkedBlockingQueue<>(1); Flux.from(testee .deQueue()) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .flatMap(e -> { try { itemQueue.put(e); @@ -482,7 +482,7 @@ public interface MailQueueContract { int operationCount = 15; int totalDequeuedMessages = 50; LinkedBlockingDeque<MailQueue.MailQueueItem> deque = new LinkedBlockingDeque<>(); - Flux.from(testee.deQueue()).subscribeOn(Schedulers.boundedElastic()).doOnNext(deque::addFirst).subscribe(); + Flux.from(testee.deQueue()).subscribeOn(Schedulers.elastic()).doOnNext(deque::addFirst).subscribe(); ConcurrentTestRunner.builder() .operation((threadNumber, step) -> { if (step % 3 == 0) { diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java index a971218..c8da0a6 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java @@ -117,7 +117,7 @@ public interface PriorityMailQueueContract { .attribute(mailPriority(1)) .build()); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next(); @@ -137,7 +137,7 @@ public interface PriorityMailQueueContract { .attribute(mailPriority(8)) .build()); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next(); @@ -161,7 +161,7 @@ public interface PriorityMailQueueContract { .attribute(mailPriority(6)) .build()); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next(); @@ -187,7 +187,7 @@ public interface PriorityMailQueueContract { .attribute(mailPriority(6)) .build()); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next(); diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java index 2066a7c..8a6186c 100644 --- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java +++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java @@ -97,7 +97,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu this.name = name; this.flux = Mono.fromCallable(mailItems::take) .repeat() - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .flatMap(item -> Mono.fromRunnable(() -> inProcessingMailItems.add(item)).thenReturn(item)) .map(mailQueueItemDecoratorFactory::decorate); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java index ae273f7..a08318e 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -62,7 +62,7 @@ public class CassandraMailQueueMailDelete { void updateBrowseStart(MailQueueName mailQueueName) { findNewBrowseStart(mailQueueName) .flatMap(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart)) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .subscribe(); } diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index 44a17f5..b04ea22 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -86,14 +86,14 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta sendQueue = UnicastProcessor.create(); sendQueueHandle = sender .send(sendQueue) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .subscribe(); listenerReceiver = channelPool.createReceiver(); listener = DirectProcessor.create(); listenQueueHandle = listenerReceiver .consumeAutoAck(queueName) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .concatMap(this::toEvent) .subscribe(listener::onNext); } diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index 27196f8..eb2044d 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -103,7 +103,7 @@ public class RabbitMQWorkQueue implements WorkQueue { private void consumeWorkqueue() { receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono())); receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions()) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .concatMap(this::executeTask) .subscribe(); } @@ -150,7 +150,7 @@ public class RabbitMQWorkQueue implements WorkQueue { sendCancelRequestsQueue = UnicastProcessor.create(); sendCancelRequestsQueueHandle = cancelRequestSender .send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage)) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .subscribe(); } @@ -158,7 +158,7 @@ public class RabbitMQWorkQueue implements WorkQueue { cancelRequestListener = channelPool.createReceiver(); cancelRequestListenerHandle = cancelRequestListener .consumeAutoAck(queueName) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .map(this::readCancelRequestMessage) .doOnNext(worker::cancelTask) .subscribe(); diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java index 5b8bb43..ef6fd07 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java @@ -84,7 +84,7 @@ class RabbitMQWorkQueueTest { tasks.add(taskWithId); return Mono.fromCallable(() -> taskWithId.getTask().run()) .doOnNext(result -> results.add(result)) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.elastic()); } @Override diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java index 752c1e6..afe9a12 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -152,7 +152,7 @@ public class MemoryTaskManager implements TaskManager { @Override public TaskExecutionDetails await(TaskId id, Duration timeout) throws TaskNotFoundException, ReachedTimeoutException { try { - return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.boundedElastic()) + return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic()) .map(ignored -> getExecutionDetails(id)) .filter(details -> details.getStatus().isFinished()) .blockFirst(timeout); diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java index a782f9b..4446bd4 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java @@ -35,7 +35,7 @@ public class MemoryWorkQueue implements WorkQueue { this.worker = worker; this.tasks = UnicastProcessor.create(); this.subscription = tasks - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .limitRate(1) .concatMap(this::dispatchTaskToWorker) .subscribe(); diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index 25005fd..ed35478 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -87,7 +87,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) { return Mono.fromCallable(() -> taskWithId.getTask().details()) - .delayElement(pollingInterval, Schedulers.boundedElastic()) + .delayElement(pollingInterval, Schedulers.elastic()) .repeat() .flatMap(Mono::justOrEmpty) .doOnNext(information -> listener.updated(taskWithId.getId(), information)); diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 41eeb88..025d9ef 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -113,7 +113,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] .then(details) Flux.merge(findEvent, details) - .subscribeOn(Schedulers.boundedElastic) + .subscribeOn(Schedulers.elastic) .blockFirst(timeout) } catch { case _: IllegalStateException => throw new ReachedTimeoutException diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java index 4ddf181..b63c170 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java @@ -127,7 +127,7 @@ public interface TerminationSubscriberContract { List<Event> listenedEvents = Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3).dividedBy(2))) .then(Mono.defer(() -> collectEvents(subscriber.listenEvents()))) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .block(); assertThat(listenedEvents).containsExactly(FAILED_EVENT, CANCELLED_EVENT); } @@ -139,7 +139,7 @@ public interface TerminationSubscriberContract { default Mono<List<Event>> collectEvents(Publisher<Event> listener) { return Flux.from(listener) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7))) .collectList(); } @@ -147,7 +147,7 @@ public interface TerminationSubscriberContract { default void sendEvents(TerminationSubscriber subscriber, Event... events) { Mono.delay(DELAY_BEFORE_PUBLISHING) .flatMapMany(ignored -> Flux.fromArray(events) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.elastic()) .delayElements(DELAY_BETWEEN_EVENTS) .doOnNext(subscriber::handle)) .subscribe(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
