This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d86bf0801a73e887d5c77d3d2320dd25c8185431 Author: RĂ©mi Kowalski <rkowal...@linagora.com> AuthorDate: Wed Sep 25 10:46:27 2019 +0200 JAMES-2899 replace elastic schedulers by boundedElastic ones --- .../backends/cassandra/init/CassandraTableManager.java | 2 +- .../backends/cassandra/utils/CassandraAsyncExecutor.java | 2 +- .../org/apache/james/backends/es/ClientProvider.java | 4 ++-- .../backends/rabbitmq/RabbitMQConnectionFactory.java | 2 +- .../backends/rabbitmq/ReactorRabbitMQChannelPool.java | 2 +- .../james/backends/rabbitmq/SimpleChannelPool.java | 4 ++-- .../james/backends/rabbitmq/SimpleConnectionPool.java | 2 +- .../james/mailbox/backup/DefaultMailboxBackup.java | 2 +- .../mailbox/cassandra/mail/CassandraMessageDAO.java | 2 +- .../mailbox/cassandra/mail/CassandraMessageIdMapper.java | 4 ++-- .../mailbox/cassandra/mail/CassandraModSeqProvider.java | 2 +- .../james/mailbox/events/delivery/EventDelivery.java | 2 +- .../james/mailbox/events/delivery/InVmEventDelivery.java | 4 ++-- .../org/apache/james/mailbox/events/EventDispatcher.java | 2 +- .../apache/james/mailbox/events/GroupRegistration.java | 2 +- .../james/mailbox/events/KeyRegistrationHandler.java | 2 +- .../apache/james/mailbox/events/WaitDelayGenerator.java | 2 +- .../james/mailbox/events/LocalListenerRegistryTest.java | 2 +- .../james/vault/blob/BlobStoreDeletedMessageVault.java | 2 +- .../vault/blob/BlobStoreVaultGarbageCollectionTask.java | 2 +- .../org/apache/james/mailbox/store/PreDeletionHooks.java | 2 +- .../apache/james/mailbox/store/StoreMessageManager.java | 2 +- .../src/main/java/org/apache/james/blob/api/Store.java | 2 +- .../apache/james/blob/cassandra/CassandraBlobStore.java | 6 +++--- .../james/blob/objectstorage/ObjectStorageBlobsDAO.java | 6 +++--- .../blob/objectstorage/StreamCompatibleBlobPutter.java | 6 +++--- .../james/blob/objectstorage/aws/AwsS3ObjectStorage.java | 4 ++-- .../blob/objectstorage/ObjectStorageBlobsDAOTest.java | 8 ++++---- .../james/modules/mailbox/ResilientClusterProvider.java | 4 ++-- .../java/org/apache/james/StartUpChecksPerformer.java | 2 +- .../org/apache/james/GuiceLifecycleHeathCheckTest.java | 2 +- .../src/main/java/org/apache/james/util/Runnables.java | 4 ++-- .../java/org/apache/james/util/ReactorUtilsTest.java | 8 ++++---- .../james/mailetcontainer/impl/JamesMailSpooler.java | 4 ++-- .../apache/james/queue/api/DelayedMailQueueContract.java | 6 +++--- .../queue/api/DelayedPriorityMailQueueContract.java | 4 ++-- .../org/apache/james/queue/api/MailQueueContract.java | 16 ++++++++-------- .../james/queue/api/PriorityMailQueueContract.java | 8 ++++---- .../view/cassandra/CassandraMailQueueMailDelete.java | 2 +- .../distributed/RabbitMQTerminationSubscriber.java | 4 ++-- .../eventsourcing/distributed/RabbitMQWorkQueue.java | 6 +++--- .../distributed/RabbitMQTerminationSubscriberTest.java | 2 +- .../java/org/apache/james/task/MemoryTaskManager.java | 2 +- .../main/java/org/apache/james/task/MemoryWorkQueue.java | 2 +- .../task/eventsourcing/EventSourcingTaskManager.scala | 2 +- .../eventsourcing/TerminationSubscriberContract.java | 8 ++++---- 46 files changed, 85 insertions(+), 85 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java index 30980f3..0f16dc2 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java @@ -59,7 +59,7 @@ public class CassandraTableManager { public void clearAllTables() { CassandraAsyncExecutor executor = new CassandraAsyncExecutor(session); Flux.fromIterable(module.moduleTables()) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .map(CassandraTable::getName) .flatMap(name -> truncate(executor, name)) .then() diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java index 1b6464c..50508cf 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java @@ -44,7 +44,7 @@ public class CassandraAsyncExecutor { public Mono<ResultSet> execute(Statement statement) { return Mono.defer(() -> Mono.fromFuture(FutureConverter .toCompletableFuture(session.executeAsync(statement))) - .publishOn(Schedulers.elastic())); + .publishOn(Schedulers.boundedElastic())); } public Mono<Boolean> executeReturnApplied(Statement statement) { diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java index 9fb7f1a..f6deb48 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java @@ -60,8 +60,8 @@ public class ClientProvider implements Provider<RestHighLevelClient> { return Mono.fromCallable(() -> connectToCluster(configuration)) .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}", DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e)) - .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.elastic()) - .publishOn(Schedulers.elastic()) + .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.boundedElastic()) + .publishOn(Schedulers.boundedElastic()) .block(); } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java index 1f4d0fd..ccbc28b 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java @@ -60,6 +60,6 @@ public class RabbitMQConnectionFactory { Mono<Connection> connectionMono() { Duration forever = Duration.ofMillis(Long.MAX_VALUE); return Mono.fromCallable(connectionFactory::newConnection) - .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.elastic()); + .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.boundedElastic()); } } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index dfba0ba..9e66225 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -72,7 +72,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool { return Mono.fromCallable(connection::openChannel) .map(maybeChannel -> maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels"))) - .retryBackoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF, FOREVER, Schedulers.elastic()) + .retryBackoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF, FOREVER, Schedulers.boundedElastic()) .doOnError(throwable -> LOGGER.error("error when creating new channel", throwable)); } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java index f99ab43..c322af7 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java @@ -87,13 +87,13 @@ public class SimpleChannelPool implements RabbitMQChannelPool { Duration initialDelay = Duration.ofMillis(100); Duration forever = Duration.ofMillis(Long.MAX_VALUE); return Mono.defer(this::getOpenChannel) - .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic()); + .retryBackoff(numRetries, initialDelay, forever, Schedulers.boundedElastic()); } private Mono<Channel> getOpenChannel() { Channel previous = channelReference.get(); return Mono.justOrEmpty(previous) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .filter(Channel::isOpen) .switchIfEmpty(connectionPool.getResilientConnection() .flatMap(connection -> Mono.fromCallable(connection::createChannel))) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java index 4ffebfd..d2e08e4 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java @@ -57,7 +57,7 @@ public class SimpleConnectionPool implements AutoCloseable { Duration initialDelay = Duration.ofMillis(100); Duration forever = Duration.ofMillis(Long.MAX_VALUE); return Mono.defer(this::getOpenConnection) - .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic()); + .retryBackoff(numRetries, initialDelay, forever, Schedulers.boundedElastic()); } private Mono<Connection> getOpenConnection() { diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java index 355ec7e..b1f2a23 100644 --- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java +++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java @@ -114,7 +114,7 @@ public class DefaultMailboxBackup implements MailboxBackup { } return Mono.fromRunnable(Throwing.runnable(() -> archiveRestorer.restore(user, source)).sneakyThrow()) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .doOnError(e -> LOGGER.error("Error during account restoration for user : " + user, e)) .doOnTerminate(Throwing.runnable(source::close).sneakyThrow()) .thenReturn(BackupStatus.DONE) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index f2e5319..2848c69 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -233,7 +233,7 @@ public class CassandraMessageDAO { public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .flatMap(id -> retrieveRow(id, fetchType) .flatMap(resultSet -> message(resultSet, id, fetchType)), configuration.getMessageReadChunkSize()); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index a6637b8..c69b4e5 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -91,7 +91,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { @Override public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) { return Flux.fromStream(messageIds.stream()) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize()) .collectList() .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) @@ -177,7 +177,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { public void delete(Multimap<MessageId, MailboxId> ids) { Flux.fromIterable(ids.asMap() .entrySet()) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize()) .then() .block(); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index efaf09f..97fe21f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -192,7 +192,7 @@ public class CassandraModSeqProvider implements ModSeqProvider { Duration forever = Duration.ofMillis(Long.MAX_VALUE); return tryFindThenUpdateOnce(mailboxId) .single() - .retryBackoff(maxModSeqRetries, Duration.ofMillis(2), forever, Schedulers.elastic()); + .retryBackoff(maxModSeqRetries, Duration.ofMillis(2), forever, Schedulers.boundedElastic()); } private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) { diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java index 7b2fb67..0f56a60 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java @@ -89,7 +89,7 @@ public interface EventDelivery { @Override public Mono<Void> doRetry(Mono<Void> executionResult, Event event) { return executionResult - .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.boundedElastic()) .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}", mailboxListener.getClass().getCanonicalName(), retryBackoff.getMaxRetries(), diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java index a45f36c..da23d3a 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java @@ -71,13 +71,13 @@ public class InVmEventDelivery implements EventDelivery { Mono<Void> deliveryToListener = Mono.fromRunnable(() -> doDeliverToListener(listener, event)) .doOnError(throwable -> structuredLogger(event, listener) .log(logger -> logger.error("Error while processing listener", throwable))) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .then(); return deliveryOption.getRetrier().doRetry(deliveryToListener, event) .onErrorResume(throwable -> deliveryOption.getPermanentFailureHandler().handle(event)) .subscribeWith(MonoProcessor.create()) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .then(); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java index ebfa205..e144adb 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java @@ -80,7 +80,7 @@ class EventDispatcher { .concat( dispatchToLocalListeners(event, keys), dispatchToRemoteListeners(serializeEvent(event), keys)) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .doOnError(throwable -> LOGGER.error("error while dispatching event", throwable)) .then() .subscribeWith(MonoProcessor.create()); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index 79aa6a8..953a4fb 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -139,7 +139,7 @@ class GroupRegistration implements Registration { int currentRetryCount = getRetryCount(acknowledgableDelivery); return delayGenerator.delayIfHaveTo(currentRetryCount) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event)))) .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)) .then(Mono.fromRunnable(acknowledgableDelivery::ack)); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java index 7910152..1f2fbae 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java @@ -130,7 +130,7 @@ class KeyRegistrationHandler { .log(logger -> logger.error("Exception happens when handling event", e))) .onErrorResume(e -> Mono.empty()) .then()) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .then(); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java index 33a3586..922ae02 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java @@ -59,7 +59,7 @@ class WaitDelayGenerator { } return countRetryMono - .delayElement(generateDelay(retryCount), Schedulers.elastic()); + .delayElement(generateDelay(retryCount), Schedulers.boundedElastic()); } @VisibleForTesting diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java index 8136239..9bd70ec 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java @@ -247,7 +247,7 @@ class LocalListenerRegistryTest { LocalListenerRegistry.LocalRegistration registration5 = testee.addListener(KEY_1, listener5); Mono<List<MailboxListener>> listeners = testee.getLocalMailboxListeners(KEY_1) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .delayElements(Duration.ofMillis(100)) .collectList(); diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java index ac9c1b7..766bea6 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java @@ -154,7 +154,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { .flatMap(storageInformation -> Mono.from(messageMetadataVault.remove(storageInformation.getBucketName(), user, messageId)) .thenReturn(storageInformation)) .flatMap(storageInformation -> blobStore.delete(storageInformation.getBucketName(), storageInformation.getBlobId())) - .subscribeOn(Schedulers.elastic()); + .subscribeOn(Schedulers.boundedElastic()); } @Override diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java index cf49b6a..827658d 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java @@ -88,7 +88,7 @@ public class BlobStoreVaultGarbageCollectionTask implements Task { public Result run() { retentionOperation .doOnNext(deletedBuckets::add) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .then() .block(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java index b868d62..0429289 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java @@ -50,7 +50,7 @@ public class PreDeletionHooks { public Mono<Void> runHooks(PreDeletionHook.DeleteOperation deleteOperation) { return Flux.fromIterable(hooks) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .flatMap(hook -> metricFactory.runPublishingTimerMetric(PRE_DELETION_HOOK_METRIC_NAME, Mono.from(hook.notifyDelete(deleteOperation))), CONCURRENCY) .then(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index e8c3a80..76e5650 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -772,7 +772,7 @@ public class StoreMessageManager implements MessageManager { MessageMapper messageMapper = mapperFactory.getMessageMapper(session); DeleteOperation deleteOperation = Flux.fromIterable(MessageRange.toRanges(uids)) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .flatMap(range -> Mono.fromCallable(() -> messageMapper.findInMailbox(mailbox, range, FetchType.Metadata, UNLIMITED)) .flatMapMany(iterator -> Flux.fromStream(Iterators.toStream(iterator)))) .map(mailboxMessage -> MetadataWithMailboxId.from(mailboxMessage.metaData(), mailboxMessage.getMailboxId())) diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java index 5feef59..41910d5 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java @@ -118,7 +118,7 @@ public interface Store<T, I> { @Override public Mono<T> read(I blobIds) { return Flux.fromIterable(blobIds.asMap().entrySet()) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .flatMapSequential( entry -> blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue()) .zipWith(Mono.just(entry.getKey()))) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index 7f7efe3..f725304 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -92,7 +92,7 @@ public class CassandraBlobStore implements BlobStore { private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] data, BlobId blobId) { Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize()); return Flux.fromStream(chunks) - .publishOn(Schedulers.elastic(), PREFETCH) + .publishOn(Schedulers.boundedElastic(), PREFETCH) .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue()) .then(Mono.just(getChunkNum(pair)))) .collect(Collectors.maxBy(Comparator.comparingInt(x -> x))) @@ -129,13 +129,13 @@ public class CassandraBlobStore implements BlobStore { private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) { Integer rowCount = selectRowCount(bucketName, blobId) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .single() .onErrorResume(NoSuchElementException.class, e -> Mono.error( new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)))) .block(); return Flux.range(0, rowCount) - .publishOn(Schedulers.elastic(), PREFETCH) + .publishOn(Schedulers.boundedElastic(), PREFETCH) .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex) .single() .onErrorResume(NoSuchElementException.class, e -> Mono.error( diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java index 6352d07..bec6b3b 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java @@ -186,7 +186,7 @@ public class ObjectStorageBlobsDAO implements BlobStore { public Mono<Void> deleteBucket(BucketName bucketName) { ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); return Mono.<Void>fromRunnable(() -> blobStore.deleteContainer(resolvedBucketName.asString())) - .subscribeOn(Schedulers.elastic()); + .subscribeOn(Schedulers.boundedElastic()); } public PayloadCodec getPayloadCodec() { @@ -196,7 +196,7 @@ public class ObjectStorageBlobsDAO implements BlobStore { @VisibleForTesting Mono<Void> deleteAllBuckets() { return Flux.fromIterable(blobStore.list()) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .filter(storageMetadata -> storageMetadata.getType().equals(StorageType.CONTAINER)) .map(StorageMetadata::getName) .doOnNext(blobStore::deleteContainer) @@ -207,6 +207,6 @@ public class ObjectStorageBlobsDAO implements BlobStore { public Mono<Void> delete(BucketName bucketName, BlobId blobId) { ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); return Mono.<Void>fromRunnable(() -> blobStore.removeBlob(resolvedBucketName.asString(), blobId.asString())) - .subscribeOn(Schedulers.elastic()); + .subscribeOn(Schedulers.boundedElastic()); } } diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java index 30ad89c..aa9e294 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java @@ -52,14 +52,14 @@ public class StreamCompatibleBlobPutter implements BlobPutter { @Override public void putDirectly(ObjectStorageBucketName bucketName, Blob blob) { Mono.fromRunnable(() -> blobStore.putBlob(bucketName.asString(), blob)) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .retryWhen(Retry.onlyIf(retryContext -> needToCreateBucket(retryContext.exception(), bucketName)) .exponentialBackoff(FIRST_BACK_OFF, FOREVER) - .withBackoffScheduler(Schedulers.elastic()) + .withBackoffScheduler(Schedulers.boundedElastic()) .retryMax(MAX_RETRIES) .doOnRetry(retryContext -> blobStore.createContainerInLocation(DEFAULT_LOCATION, bucketName.asString()))) .retryWhen(Retry.onlyIf(RetryContext -> isPutMethod(RetryContext.exception())) - .withBackoffScheduler(Schedulers.elastic()) + .withBackoffScheduler(Schedulers.boundedElastic()) .exponentialBackoff(FIRST_BACK_OFF, FOREVER) .retryMax(RETRY_ONE_LAST_TIME_ON_CONCURRENT_SAVING)) .block(); diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java index 248accf..58855d1 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java @@ -177,11 +177,11 @@ public class AwsS3ObjectStorage { private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) { return Mono.<Void>fromRunnable(Throwing.runnable(() -> put(bucketName, configuration, blob, file)).sneakyThrow()) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .retryWhen(Retry .<Void>onlyIf(retryContext -> needToCreateBucket(retryContext.exception())) .exponentialBackoff(FIRST_BACK_OFF, FOREVER) - .withBackoffScheduler(Schedulers.elastic()) + .withBackoffScheduler(Schedulers.boundedElastic()) .retryMax(MAX_RETRY_ON_EXCEPTION) .doOnRetry(retryContext -> createBucket(bucketName, configuration))); } diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java index 5bab274..cb8247b 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java @@ -178,7 +178,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { // String need to be big enough to get async thread busy hence could not return result instantly Mono<BlobId> blobIdFuture = testee .save(testee.getDefaultBucketName(), BIG_STRING.getBytes(StandardCharsets.UTF_8)) - .subscribeOn(Schedulers.elastic()); + .subscribeOn(Schedulers.boundedElastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @@ -186,7 +186,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { void saveStringShouldNotCompleteWhenDoesNotAwait() { Mono<BlobId> blobIdFuture = testee .save(testee.getDefaultBucketName(), BIG_STRING) - .subscribeOn(Schedulers.elastic()); + .subscribeOn(Schedulers.boundedElastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @@ -194,14 +194,14 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { void saveInputStreamShouldNotCompleteWhenDoesNotAwait() { Mono<BlobId> blobIdFuture = testee .save(testee.getDefaultBucketName(), new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8))) - .subscribeOn(Schedulers.elastic()); + .subscribeOn(Schedulers.boundedElastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @Test void readBytesShouldNotCompleteWhenDoesNotAwait() { BlobId blobId = testee().save(testee.getDefaultBucketName(), BIG_STRING).block(); - Mono<byte[]> resultFuture = testee.readBytes(testee.getDefaultBucketName(), blobId).subscribeOn(Schedulers.elastic()); + Mono<byte[]> resultFuture = testee.readBytes(testee.getDefaultBucketName(), blobId).subscribeOn(Schedulers.boundedElastic()); assertThat(resultFuture.toFuture()).isNotCompleted(); } } diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java index 5259233..e28e6a4 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java @@ -52,8 +52,8 @@ public class ResilientClusterProvider implements Provider<Cluster> { Duration forever = Duration.ofMillis(Long.MAX_VALUE); cluster = Mono.fromCallable(getClusterRetryCallable(configuration)) .doOnError(e -> LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", waitDelay, e)) - .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.elastic()) - .publishOn(Schedulers.elastic()) + .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.boundedElastic()) + .publishOn(Schedulers.boundedElastic()) .block(); } diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java b/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java index b04f2c3..e2d0f90 100644 --- a/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java @@ -84,7 +84,7 @@ public class StartUpChecksPerformer { public List<StartUpCheck.CheckResult> check() { return Flux.fromIterable(startUpChecks) - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .map(this::checkQuietly) .collect(Guavate.toImmutableList()) .block(); diff --git a/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java b/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java index 1a12e66..785925f 100644 --- a/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java +++ b/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java @@ -122,7 +122,7 @@ class GuiceLifecycleHeathCheckTest { stopMono = Mono.fromRunnable(server::stop); stopMono - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .subscribeWith(MonoProcessor.create()); when() diff --git a/server/container/util/src/main/java/org/apache/james/util/Runnables.java b/server/container/util/src/main/java/org/apache/james/util/Runnables.java index b0f707e..7ed8850 100644 --- a/server/container/util/src/main/java/org/apache/james/util/Runnables.java +++ b/server/container/util/src/main/java/org/apache/james/util/Runnables.java @@ -31,9 +31,9 @@ public class Runnables { public static void runParallel(Flux<Runnable> runnables) { runnables - .publishOn(Schedulers.elastic()) + .publishOn(Schedulers.boundedElastic()) .parallel() - .runOn(Schedulers.elastic()) + .runOn(Schedulers.boundedElastic()) .flatMap(runnable -> { runnable.run(); return Mono.empty(); diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index 7bdc678..e900e05 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -85,7 +85,7 @@ class ReactorUtilsTest { void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.range(0, 10) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)) .map(index -> new byte[] {(byte) (int) index}) @@ -104,7 +104,7 @@ class ReactorUtilsTest { void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); @@ -122,7 +122,7 @@ class ReactorUtilsTest { void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11}) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); @@ -140,7 +140,7 @@ class ReactorUtilsTest { void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.<byte[]>empty() - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index 9333f88..bf308ce 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -118,7 +118,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB .publishOn(spooler) .flatMap(this::handleOnQueueItem) .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable)) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } @@ -138,7 +138,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private Mono<Void> processMail(MailQueueItem queueItem) { Mail mail = queueItem.getMail(); return Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName())) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .then(Mono.fromCallable(() -> performProcessMail(mail))) .flatMap(any -> acknowledgeItem(queueItem, true)) .onErrorResume(any -> acknowledgeItem(queueItem, false)) diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java index e0b51a0..588b9e7 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java @@ -47,7 +47,7 @@ public interface DelayedMailQueueContract { 5L, TimeUnit.SECONDS); - Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next(); + Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next(); assertThatThrownBy(() -> next.block(Duration.ofSeconds(1))) .isInstanceOf(RuntimeException.class); } @@ -72,7 +72,7 @@ public interface DelayedMailQueueContract { 365 * 10, TimeUnit.DAYS); - Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next(); + Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next(); assertThatThrownBy(() -> next.block(Duration.ofSeconds(1))) .isInstanceOf(RuntimeException.class); } @@ -84,7 +84,7 @@ public interface DelayedMailQueueContract { .build(), ChronoUnit.FOREVER.getDuration()); - Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next(); + Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next(); assertThatThrownBy(() -> next.block(Duration.ofSeconds(1))) .isInstanceOf(RuntimeException.class); } diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java index 03d473e..def35bd 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java @@ -53,7 +53,7 @@ public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContra Thread.sleep(unit.toMillis(2 * delay)); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem item1 = mailQueueItems.next(); item1.done(true); MailQueue.MailQueueItem item2 = mailQueueItems.next(); @@ -79,7 +79,7 @@ public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContra delay, unit); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem item1 = mailQueueItems.next(); item1.done(true); MailQueue.MailQueueItem item2 = mailQueueItems.next(); diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java index 429f067..f1d57c8 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java @@ -331,7 +331,7 @@ public interface MailQueueContract { .name(secondExpectedName) .build()); - Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = items.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = items.next(); @@ -350,7 +350,7 @@ public interface MailQueueContract { .build()); Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()) - .subscribeOn(Schedulers.elastic()).toIterable().iterator(); + .subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = items.next(); MailQueue.MailQueueItem mailQueueItem2 = items.next(); mailQueueItem1.done(true); @@ -369,7 +369,7 @@ public interface MailQueueContract { .name("name2") .build()); - Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = items.next(); MailQueue.MailQueueItem mailQueueItem2 = items.next(); mailQueueItem2.done(true); @@ -384,7 +384,7 @@ public interface MailQueueContract { .name("name1") .build()); - Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = items.next(); mailQueueItem1.done(false); MailQueue.MailQueueItem mailQueueItem2 = items.next(); @@ -400,7 +400,7 @@ public interface MailQueueContract { .build()); LinkedBlockingQueue<MailQueue.MailQueueItem> queue = new LinkedBlockingQueue<>(1); - Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).subscribe(Throwing.consumer(queue::put)); + Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).subscribe(Throwing.consumer(queue::put)); queue.take(); assertThat(queue.poll(2, TimeUnit.SECONDS)).isNull(); @@ -408,7 +408,7 @@ public interface MailQueueContract { @Test default void deQueueShouldBlockWhenNoMail() { - Mono<MailQueue.MailQueueItem> item = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next(); + Mono<MailQueue.MailQueueItem> item = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next(); assertThatThrownBy(() -> item.block(Duration.ofSeconds(2))) .isInstanceOf(RuntimeException.class); @@ -439,7 +439,7 @@ public interface MailQueueContract { LinkedBlockingQueue<MailQueue.MailQueueItem> itemQueue = new LinkedBlockingQueue<>(1); Flux.from(testee .deQueue()) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .flatMap(e -> { try { itemQueue.put(e); @@ -481,7 +481,7 @@ public interface MailQueueContract { int operationCount = 15; int totalDequeuedMessages = 50; LinkedBlockingDeque<MailQueue.MailQueueItem> deque = new LinkedBlockingDeque<>(); - Flux.from(testee.deQueue()).subscribeOn(Schedulers.elastic()).doOnNext(deque::addFirst).subscribe(); + Flux.from(testee.deQueue()).subscribeOn(Schedulers.boundedElastic()).doOnNext(deque::addFirst).subscribe(); ConcurrentTestRunner.builder() .operation((threadNumber, step) -> { if (step % 3 == 0) { diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java index c8da0a6..a971218 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java @@ -117,7 +117,7 @@ public interface PriorityMailQueueContract { .attribute(mailPriority(1)) .build()); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next(); @@ -137,7 +137,7 @@ public interface PriorityMailQueueContract { .attribute(mailPriority(8)) .build()); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next(); @@ -161,7 +161,7 @@ public interface PriorityMailQueueContract { .attribute(mailPriority(6)) .build()); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next(); @@ -187,7 +187,7 @@ public interface PriorityMailQueueContract { .attribute(mailPriority(6)) .build()); - Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator(); + Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator(); MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next(); mailQueueItem1.done(true); MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next(); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java index a08318e..ae273f7 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -62,7 +62,7 @@ public class CassandraMailQueueMailDelete { void updateBrowseStart(MailQueueName mailQueueName) { findNewBrowseStart(mailQueueName) .flatMap(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart)) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index fd8317a..0d97c2b 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -89,14 +89,14 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta sendQueue = UnicastProcessor.create(); sendQueueHandle = sender .send(sendQueue) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .subscribe(); Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)); listener = DirectProcessor.create(); listenQueueHandle = receiver .consumeAutoAck(queueName) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .concatMap(this::toEvent) .subscribe(listener::onNext); } diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index 34b3f5b..f0869c3 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -106,7 +106,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { private void consumeWorkqueue() { receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono)); receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions()) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .flatMap(this::executeTask) .subscribe(); } @@ -140,7 +140,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { sendCancelRequestsQueue = UnicastProcessor.create(); sendCancelRequestsQueueHandle = cancelRequestSender .send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage)) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } @@ -148,7 +148,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { RabbitFlux .createReceiver(new ReceiverOptions().connectionMono(connectionMono)) .consumeAutoAck(queueName) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .map(this::readCancelRequestMessage) .doOnNext(worker::cancelTask) .subscribe(); diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java index 9da45e8..a6a9ab9 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java @@ -62,7 +62,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract sendEvents(subscriber1, COMPLETED_EVENT); List<List<Event>> listenedEvents = Flux.just(subscriber1, subscriber2) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .flatMap(this::collectEvents) .collectList() .block(); diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java index ccc651b..fb5242c 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -141,7 +141,7 @@ public class MemoryTaskManager implements TaskManager { @Override public TaskExecutionDetails await(TaskId id, Duration timeout) throws TaskNotFoundException, ReachedTimeoutException { try { - return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic()) + return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.boundedElastic()) .map(ignored -> getExecutionDetails(id)) .filter(details -> details.getStatus().isFinished()) .blockFirst(timeout); diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java index cea8b04..fe2f80b 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java @@ -37,7 +37,7 @@ public class MemoryWorkQueue implements WorkQueue { this.tasks = new LinkedBlockingQueue<>(); this.subscription = Mono.fromCallable(tasks::take) .repeat() - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .flatMapSequential(this::dispatchTaskToWorker) .subscribe(); } diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 4e281de..b91eeca 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -110,7 +110,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] .then(details) Flux.merge(findEvent, details) - .subscribeOn(Schedulers.elastic) + .subscribeOn(Schedulers.boundedElastic) .blockFirst(timeout) } catch { case _: IllegalStateException => throw new ReachedTimeoutException diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java index 882f6d7..705b291 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java @@ -100,7 +100,7 @@ public interface TerminationSubscriberContract { sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT); List<List<Event>> listenedEvents = Flux.range(0, 2) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .flatMap(ignored -> collectEvents(subscriber)) .collectList() .block(); @@ -117,7 +117,7 @@ public interface TerminationSubscriberContract { List<Event> listenedEvents = Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3).dividedBy(2))) .then(Mono.defer(() -> collectEvents(subscriber))) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .block(); assertThat(listenedEvents).containsExactly(FAILED_EVENT, CANCELLED_EVENT); } @@ -129,7 +129,7 @@ public interface TerminationSubscriberContract { default Mono<List<Event>> collectEvents(TerminationSubscriber subscriber) { return Flux.from(subscriber.listenEvents()) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7))) .collectList(); } @@ -137,7 +137,7 @@ public interface TerminationSubscriberContract { default void sendEvents(TerminationSubscriber subscriber, Event... events) { Mono.delay(DELAY_BEFORE_PUBLISHING) .flatMapMany(ignored -> Flux.fromArray(events) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.boundedElastic()) .delayElements(DELAY_BETWEEN_EVENTS) .doOnNext(subscriber::handle)) .subscribe(); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org