This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 6e1887f9b76d4f7853c31a4c09fd0d424e0a4f57 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Feb 13 13:38:06 2019 +0700 JAMES-2630 Rely on Reactor for connection retries --- .../modules/mailbox/CassandraSessionModule.java | 9 -- .../mailbox/ElasticSearchMailboxModule.java | 18 +-- .../modules/mailbox/ResilientClusterProvider.java | 23 ++-- .../mailbox/ScheduledExecutorServiceProvider.java | 53 --------- .../apache/james/util/retry/RetryExecutorUtil.java | 44 ------- .../james/util/retry/RetryExecutorUtilTest.java | 130 --------------------- 6 files changed, 21 insertions(+), 256 deletions(-) diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java index 4a81d1e..ad9ab2a 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java @@ -21,7 +21,6 @@ package org.apache.james.modules.mailbox; import java.io.FileNotFoundException; import java.util.List; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -57,7 +56,6 @@ import com.google.inject.Provides; import com.google.inject.Scopes; import com.google.inject.Singleton; import com.google.inject.multibindings.Multibinder; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; public class CassandraSessionModule extends AbstractModule { @@ -70,7 +68,6 @@ public class CassandraSessionModule extends AbstractModule { @Override protected void configure() { - bind(ScheduledExecutorService.class).toProvider(ScheduledExecutorServiceProvider.class); bind(CassandraUtils.class).in(Scopes.SINGLETON); bind(Session.class).toProvider(SessionWithInitializedTablesFactory.class); bind(Cluster.class).toProvider(ResilientClusterProvider.class); @@ -116,12 +113,6 @@ public class CassandraSessionModule extends AbstractModule { } } - - @Provides - private AsyncRetryExecutor provideAsyncRetryExecutor(ScheduledExecutorService scheduler) { - return new AsyncRetryExecutor(scheduler); - } - @VisibleForTesting @Provides @Singleton diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java index 4a72fe5..b50f0f6 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java @@ -22,8 +22,8 @@ package org.apache.james.modules.mailbox; import static org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher.DEFAULT_SEARCH_SIZE; import java.io.FileNotFoundException; +import java.time.Duration; import java.time.LocalDateTime; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import javax.inject.Named; @@ -48,10 +48,8 @@ import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; import org.apache.james.mailbox.store.search.MessageSearchIndex; import org.apache.james.quota.search.elasticsearch.ElasticSearchQuotaConfiguration; import org.apache.james.quota.search.elasticsearch.QuotaSearchIndexCreationUtil; -import org.apache.james.util.retry.RetryExecutorUtil; import org.apache.james.utils.PropertiesProvider; import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.NoNodeAvailableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +57,9 @@ import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.google.inject.Scopes; import com.google.inject.multibindings.Multibinder; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class ElasticSearchMailboxModule extends AbstractModule { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchMailboxModule.class); @@ -137,12 +137,12 @@ public class ElasticSearchMailboxModule extends AbstractModule { @Singleton protected Client provideClient(ElasticSearchConfiguration configuration, ElasticSearchMailboxConfiguration mailboxConfiguration, - ElasticSearchQuotaConfiguration quotaConfiguration, - AsyncRetryExecutor executor) throws ExecutionException, InterruptedException { + ElasticSearchQuotaConfiguration quotaConfiguration) { - return RetryExecutorUtil.retryOnExceptions(executor, configuration.getMaxRetries(), configuration.getMinDelay(), NoNodeAvailableException.class) - .getWithRetry(context -> connectToCluster(configuration, mailboxConfiguration, quotaConfiguration)) - .get(); + return Mono.fromCallable(() -> connectToCluster(configuration, mailboxConfiguration, quotaConfiguration)) + .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelay())) + .publishOn(Schedulers.elastic()) + .block(); } private Client connectToCluster(ElasticSearchConfiguration configuration, diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java index ef2e4a8..f514675 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java @@ -19,8 +19,9 @@ package org.apache.james.modules.mailbox; +import java.time.Duration; import java.time.LocalDateTime; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Callable; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -30,15 +31,14 @@ import javax.inject.Singleton; import org.apache.james.backends.cassandra.init.ClusterBuilder; import org.apache.james.backends.cassandra.init.ClusterWithKeyspaceCreatedFactory; import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration; -import org.apache.james.util.retry.RetryExecutorUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.collect.ImmutableList; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; -import com.nurkiewicz.asyncretry.function.RetryCallable; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; @Singleton public class ResilientClusterProvider implements Provider<Cluster> { @@ -47,17 +47,18 @@ public class ResilientClusterProvider implements Provider<Cluster> { private final Cluster cluster; @Inject - private ResilientClusterProvider(ClusterConfiguration configuration, AsyncRetryExecutor executor) throws ExecutionException, InterruptedException { - cluster = RetryExecutorUtil.retryOnExceptions(executor, configuration.getMaxRetry(), configuration.getMinDelay(), NoHostAvailableException.class) - .getWithRetry(getClusterRetryCallable(configuration)) - .get(); + private ResilientClusterProvider(ClusterConfiguration configuration) { + cluster = Mono.fromCallable(getClusterRetryCallable(configuration)) + .retryBackoff(configuration.getMaxRetry(), Duration.ofMillis(configuration.getMinDelay())) + .publishOn(Schedulers.elastic()) + .block(); } - private RetryCallable<Cluster> getClusterRetryCallable(ClusterConfiguration configuration) { + private Callable<Cluster> getClusterRetryCallable(ClusterConfiguration configuration) { LOGGER.info("Trying to connect to Cassandra service at {} (list {})", LocalDateTime.now(), ImmutableList.copyOf(configuration.getHosts()).toString()); - return context -> { + return () -> { Cluster cluster = ClusterBuilder.builder() .servers(configuration.getHosts()) .poolingOptions(configuration.getPoolingOptions()) diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java deleted file mode 100644 index 56ec923..0000000 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java +++ /dev/null @@ -1,53 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.modules.mailbox; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; - -import javax.annotation.PreDestroy; - -import org.apache.james.util.concurrent.NamedThreadFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.inject.Provider; - -@VisibleForTesting -class ScheduledExecutorServiceProvider implements Provider<ScheduledExecutorService> { - - private final ScheduledExecutorService scheduler; - - @VisibleForTesting - ScheduledExecutorServiceProvider() { - ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass()); - scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); - } - - @Override - public ScheduledExecutorService get() { - return scheduler; - } - - @PreDestroy - private void stop() { - scheduler.shutdown(); - } -} diff --git a/server/container/util/src/main/java/org/apache/james/util/retry/RetryExecutorUtil.java b/server/container/util/src/main/java/org/apache/james/util/retry/RetryExecutorUtil.java deleted file mode 100644 index 34bdcf9..0000000 --- a/server/container/util/src/main/java/org/apache/james/util/retry/RetryExecutorUtil.java +++ /dev/null @@ -1,44 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.util.retry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; - -public class RetryExecutorUtil { - private static final Logger LOG = LoggerFactory.getLogger(RetryExecutorUtil.class); - - private static final int INITIAL_DELAY_MILLIS = 500; - private static final int MULTIPLIER = 2; - - @SafeVarargs - public static AsyncRetryExecutor retryOnExceptions(AsyncRetryExecutor executor, int maxRetries, int minDelay, Class<? extends Throwable>... clazz) { - LOG.info("The action should retry when {} and retry to {} times if needed", clazz, maxRetries); - return executor - .withExponentialBackoff(INITIAL_DELAY_MILLIS, MULTIPLIER) - .withProportionalJitter() - .retryOn(clazz) - .withMaxRetries(maxRetries) - .withMinDelay(minDelay); - } - -} diff --git a/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java b/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java deleted file mode 100644 index 3af80ca..0000000 --- a/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.util.retry; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.BDDMockito.given; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; - -import org.apache.james.util.concurrent.NamedThreadFactory; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; -import com.nurkiewicz.asyncretry.RetryExecutor; - -public class RetryExecutorUtilTest { - private static final int MAX_RETRIES = 3; - private static final int MIN_DELAY = 100; - @Mock - protected FaultyService serviceMock; - - private RetryExecutor retryExecutor; - private ScheduledExecutorService scheduledExecutor; - - @BeforeEach - void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass()); - scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); - } - - @AfterEach - void tearDown() throws Exception { - scheduledExecutor.shutdownNow(); - } - - @Test - void retryOnExceptionsAndExecuteShouldRethrowWhenScheduledServiceAlwaysThrowException() throws Exception { - given(serviceMock.faultyService()) - .willThrow(IllegalArgumentException.class) - .willThrow(IllegalArgumentException.class) - .willThrow(IllegalArgumentException.class) - .willThrow(IllegalArgumentException.class); - - retryExecutor = RetryExecutorUtil.retryOnExceptions(new AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, IllegalArgumentException.class); - - assertThatThrownBy(() -> retryExecutor.getWithRetry(serviceMock::faultyService).get()) - .isInstanceOf(ExecutionException.class) - .hasCauseInstanceOf(IllegalArgumentException.class); - } - - @Test - void retryOnExceptionsAndExecuteShouldRetryWhenMatchExceptionAndSuccess() throws Exception { - given(serviceMock.faultyService()) - .willThrow(IllegalArgumentException.class) - .willReturn("Foo"); - retryExecutor = RetryExecutorUtil.retryOnExceptions(new AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, IllegalArgumentException.class); - - final CompletableFuture<String> future = retryExecutor.getWithRetry(serviceMock::faultyService); - - assertThat(future.get()).isEqualTo("Foo"); - } - - @Test - void retryOnExceptionsAndExecuteShouldNotRetryWhenDoesNotMatchException() throws Exception { - given(serviceMock.faultyService()) - .willThrow(IllegalStateException.class) - .willReturn("Foo"); - - retryExecutor = RetryExecutorUtil.retryOnExceptions(new AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, IllegalArgumentException.class); - - assertThatThrownBy(() -> retryExecutor.getWithRetry(serviceMock::faultyService).get()) - .isInstanceOf(ExecutionException.class) - .hasCauseInstanceOf(IllegalStateException.class); - } - - @Test - @SuppressWarnings("unchecked") - void retryOnExceptionsAndExecuteShouldRetryWithMaxTimesAndReturnValue() throws Exception { - given(serviceMock.faultyService()) - .willThrow(IllegalStateException.class, IllegalStateException.class, IllegalStateException.class) - .willReturn("Foo"); - - retryExecutor = RetryExecutorUtil.retryOnExceptions(new AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, IllegalStateException.class); - - CompletableFuture<String> future = retryExecutor.getWithRetry(serviceMock::faultyService); - - assertThat(future.get()).isEqualTo("Foo"); - } - - @Test - @SuppressWarnings("unchecked") - void retryOnExceptionsAndExecuteShouldFailIfFailMoreThanMaxRetry() throws Exception { - given(serviceMock.faultyService()) - .willThrow(IllegalStateException.class, IllegalStateException.class, IllegalStateException.class, IllegalStateException.class) - .willReturn("Foo"); - - retryExecutor = RetryExecutorUtil.retryOnExceptions(new AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, IllegalStateException.class); - - assertThatThrownBy(() -> retryExecutor.getWithRetry(serviceMock::faultyService).get()) - .isInstanceOf(ExecutionException.class) - .hasCauseInstanceOf(IllegalStateException.class); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
