This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6e1887f9b76d4f7853c31a4c09fd0d424e0a4f57
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Feb 13 13:38:06 2019 +0700

    JAMES-2630 Rely on Reactor for connection retries
---
 .../modules/mailbox/CassandraSessionModule.java    |   9 --
 .../mailbox/ElasticSearchMailboxModule.java        |  18 +--
 .../modules/mailbox/ResilientClusterProvider.java  |  23 ++--
 .../mailbox/ScheduledExecutorServiceProvider.java  |  53 ---------
 .../apache/james/util/retry/RetryExecutorUtil.java |  44 -------
 .../james/util/retry/RetryExecutorUtilTest.java    | 130 ---------------------
 6 files changed, 21 insertions(+), 256 deletions(-)

diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
index 4a81d1e..ad9ab2a 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
@@ -21,7 +21,6 @@ package org.apache.james.modules.mailbox;
 import java.io.FileNotFoundException;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
@@ -57,7 +56,6 @@ import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.multibindings.Multibinder;
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 
 public class CassandraSessionModule extends AbstractModule {
 
@@ -70,7 +68,6 @@ public class CassandraSessionModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        
bind(ScheduledExecutorService.class).toProvider(ScheduledExecutorServiceProvider.class);
         bind(CassandraUtils.class).in(Scopes.SINGLETON);
         
bind(Session.class).toProvider(SessionWithInitializedTablesFactory.class);
         bind(Cluster.class).toProvider(ResilientClusterProvider.class);
@@ -116,12 +113,6 @@ public class CassandraSessionModule extends AbstractModule 
{
         }
     }
 
-
-    @Provides
-    private AsyncRetryExecutor 
provideAsyncRetryExecutor(ScheduledExecutorService scheduler) {
-        return new AsyncRetryExecutor(scheduler);
-    }
-
     @VisibleForTesting
     @Provides
     @Singleton
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
index 4a72fe5..b50f0f6 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
@@ -22,8 +22,8 @@ package org.apache.james.modules.mailbox;
 import static 
org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher.DEFAULT_SEARCH_SIZE;
 
 import java.io.FileNotFoundException;
+import java.time.Duration;
 import java.time.LocalDateTime;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import javax.inject.Named;
@@ -48,10 +48,8 @@ import 
org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.mailbox.store.search.MessageSearchIndex;
 import 
org.apache.james.quota.search.elasticsearch.ElasticSearchQuotaConfiguration;
 import 
org.apache.james.quota.search.elasticsearch.QuotaSearchIndexCreationUtil;
-import org.apache.james.util.retry.RetryExecutorUtil;
 import org.apache.james.utils.PropertiesProvider;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.NoNodeAvailableException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +57,9 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.multibindings.Multibinder;
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class ElasticSearchMailboxModule extends AbstractModule {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticSearchMailboxModule.class);
@@ -137,12 +137,12 @@ public class ElasticSearchMailboxModule extends 
AbstractModule {
     @Singleton
     protected Client provideClient(ElasticSearchConfiguration configuration,
                                    ElasticSearchMailboxConfiguration 
mailboxConfiguration,
-                                   ElasticSearchQuotaConfiguration 
quotaConfiguration,
-                                   AsyncRetryExecutor executor) throws 
ExecutionException, InterruptedException {
+                                   ElasticSearchQuotaConfiguration 
quotaConfiguration) {
 
-        return RetryExecutorUtil.retryOnExceptions(executor, 
configuration.getMaxRetries(), configuration.getMinDelay(), 
NoNodeAvailableException.class)
-            .getWithRetry(context -> connectToCluster(configuration, 
mailboxConfiguration, quotaConfiguration))
-            .get();
+        return Mono.fromCallable(() -> connectToCluster(configuration, 
mailboxConfiguration, quotaConfiguration))
+            .retryBackoff(configuration.getMaxRetries(), 
Duration.ofMillis(configuration.getMinDelay()))
+            .publishOn(Schedulers.elastic())
+            .block();
     }
 
     private Client connectToCluster(ElasticSearchConfiguration configuration,
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
index ef2e4a8..f514675 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
@@ -19,8 +19,9 @@
 
 package org.apache.james.modules.mailbox;
 
+import java.time.Duration;
 import java.time.LocalDateTime;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Callable;
 
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
@@ -30,15 +31,14 @@ import javax.inject.Singleton;
 import org.apache.james.backends.cassandra.init.ClusterBuilder;
 import 
org.apache.james.backends.cassandra.init.ClusterWithKeyspaceCreatedFactory;
 import 
org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
-import org.apache.james.util.retry.RetryExecutorUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.collect.ImmutableList;
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
-import com.nurkiewicz.asyncretry.function.RetryCallable;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 @Singleton
 public class ResilientClusterProvider implements Provider<Cluster> {
@@ -47,17 +47,18 @@ public class ResilientClusterProvider implements 
Provider<Cluster> {
     private final Cluster cluster;
 
     @Inject
-    private ResilientClusterProvider(ClusterConfiguration configuration, 
AsyncRetryExecutor executor) throws ExecutionException, InterruptedException {
-        cluster = RetryExecutorUtil.retryOnExceptions(executor, 
configuration.getMaxRetry(), configuration.getMinDelay(), 
NoHostAvailableException.class)
-                .getWithRetry(getClusterRetryCallable(configuration))
-                .get();
+    private ResilientClusterProvider(ClusterConfiguration configuration) {
+        cluster = Mono.fromCallable(getClusterRetryCallable(configuration))
+            .retryBackoff(configuration.getMaxRetry(), 
Duration.ofMillis(configuration.getMinDelay()))
+            .publishOn(Schedulers.elastic())
+            .block();
     }
 
-    private RetryCallable<Cluster> 
getClusterRetryCallable(ClusterConfiguration configuration) {
+    private Callable<Cluster> getClusterRetryCallable(ClusterConfiguration 
configuration) {
         LOGGER.info("Trying to connect to Cassandra service at {} (list {})", 
LocalDateTime.now(),
             ImmutableList.copyOf(configuration.getHosts()).toString());
 
-        return context -> {
+        return () -> {
             Cluster cluster = ClusterBuilder.builder()
                     .servers(configuration.getHosts())
                     .poolingOptions(configuration.getPoolingOptions())
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
deleted file mode 100644
index 56ec923..0000000
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.modules.mailbox;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-import javax.annotation.PreDestroy;
-
-import org.apache.james.util.concurrent.NamedThreadFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.Provider;
-
-@VisibleForTesting
-class ScheduledExecutorServiceProvider implements 
Provider<ScheduledExecutorService> {
-
-    private final ScheduledExecutorService scheduler;
-
-    @VisibleForTesting
-    ScheduledExecutorServiceProvider() {
-        ThreadFactory threadFactory = 
NamedThreadFactory.withClassName(getClass());
-        scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
-    }
-
-    @Override
-    public ScheduledExecutorService get() {
-        return scheduler;
-    }
-
-    @PreDestroy
-    private void stop() {
-        scheduler.shutdown();
-    }
-}
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/retry/RetryExecutorUtil.java
 
b/server/container/util/src/main/java/org/apache/james/util/retry/RetryExecutorUtil.java
deleted file mode 100644
index 34bdcf9..0000000
--- 
a/server/container/util/src/main/java/org/apache/james/util/retry/RetryExecutorUtil.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- * http://www.apache.org/licenses/LICENSE-2.0                   *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.util.retry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
-
-public class RetryExecutorUtil {
-    private static final Logger LOG = 
LoggerFactory.getLogger(RetryExecutorUtil.class);
-
-    private static final int INITIAL_DELAY_MILLIS = 500;
-    private static final int MULTIPLIER = 2;
-
-    @SafeVarargs
-    public static AsyncRetryExecutor retryOnExceptions(AsyncRetryExecutor 
executor, int maxRetries, int minDelay, Class<? extends Throwable>... clazz) {
-        LOG.info("The action should retry when {} and retry to {} times if 
needed", clazz, maxRetries);
-        return executor
-            .withExponentialBackoff(INITIAL_DELAY_MILLIS, MULTIPLIER)
-            .withProportionalJitter()
-            .retryOn(clazz)
-            .withMaxRetries(maxRetries)
-            .withMinDelay(minDelay);
-    }
-
-}
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
deleted file mode 100644
index 3af80ca..0000000
--- 
a/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- * http://www.apache.org/licenses/LICENSE-2.0                   *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.util.retry;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.BDDMockito.given;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-import org.apache.james.util.concurrent.NamedThreadFactory;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
-import com.nurkiewicz.asyncretry.RetryExecutor;
-
-public class RetryExecutorUtilTest {
-    private static final int MAX_RETRIES = 3;
-    private static final int MIN_DELAY = 100;
-    @Mock
-    protected FaultyService serviceMock;
-
-    private RetryExecutor retryExecutor;
-    private ScheduledExecutorService scheduledExecutor;
-
-    @BeforeEach
-    void setUp() throws Exception {
-        MockitoAnnotations.initMocks(this);
-        ThreadFactory threadFactory = 
NamedThreadFactory.withClassName(getClass());
-        scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
-    }
-
-    @AfterEach
-    void tearDown() throws Exception {
-        scheduledExecutor.shutdownNow();
-    }
-
-    @Test
-    void 
retryOnExceptionsAndExecuteShouldRethrowWhenScheduledServiceAlwaysThrowException()
 throws Exception {
-        given(serviceMock.faultyService())
-                .willThrow(IllegalArgumentException.class)
-                .willThrow(IllegalArgumentException.class)
-                .willThrow(IllegalArgumentException.class)
-                .willThrow(IllegalArgumentException.class);
-
-        retryExecutor = RetryExecutorUtil.retryOnExceptions(new 
AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, 
IllegalArgumentException.class);
-
-        assertThatThrownBy(() -> 
retryExecutor.getWithRetry(serviceMock::faultyService).get())
-            .isInstanceOf(ExecutionException.class)
-            .hasCauseInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    void retryOnExceptionsAndExecuteShouldRetryWhenMatchExceptionAndSuccess() 
throws Exception {
-        given(serviceMock.faultyService())
-                .willThrow(IllegalArgumentException.class)
-                .willReturn("Foo");
-        retryExecutor = RetryExecutorUtil.retryOnExceptions(new 
AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, 
IllegalArgumentException.class);
-
-        final CompletableFuture<String> future = 
retryExecutor.getWithRetry(serviceMock::faultyService);
-
-        assertThat(future.get()).isEqualTo("Foo");
-    }
-
-    @Test
-    void retryOnExceptionsAndExecuteShouldNotRetryWhenDoesNotMatchException() 
throws Exception {
-        given(serviceMock.faultyService())
-                .willThrow(IllegalStateException.class)
-                .willReturn("Foo");
-
-        retryExecutor = RetryExecutorUtil.retryOnExceptions(new 
AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, 
IllegalArgumentException.class);
-
-        assertThatThrownBy(() -> 
retryExecutor.getWithRetry(serviceMock::faultyService).get())
-                .isInstanceOf(ExecutionException.class)
-                .hasCauseInstanceOf(IllegalStateException.class);
-    }
-
-    @Test
-    @SuppressWarnings("unchecked")
-    void retryOnExceptionsAndExecuteShouldRetryWithMaxTimesAndReturnValue() 
throws Exception {
-        given(serviceMock.faultyService())
-                .willThrow(IllegalStateException.class, 
IllegalStateException.class, IllegalStateException.class)
-                .willReturn("Foo");
-
-        retryExecutor = RetryExecutorUtil.retryOnExceptions(new 
AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, 
IllegalStateException.class);
-
-        CompletableFuture<String> future = 
retryExecutor.getWithRetry(serviceMock::faultyService);
-
-        assertThat(future.get()).isEqualTo("Foo");
-    }
-
-    @Test
-    @SuppressWarnings("unchecked")
-    void retryOnExceptionsAndExecuteShouldFailIfFailMoreThanMaxRetry() throws 
Exception {
-        given(serviceMock.faultyService())
-            .willThrow(IllegalStateException.class, 
IllegalStateException.class, IllegalStateException.class, 
IllegalStateException.class)
-            .willReturn("Foo");
-
-        retryExecutor = RetryExecutorUtil.retryOnExceptions(new 
AsyncRetryExecutor(scheduledExecutor), MAX_RETRIES, MIN_DELAY, 
IllegalStateException.class);
-
-        assertThatThrownBy(() -> 
retryExecutor.getWithRetry(serviceMock::faultyService).get())
-                .isInstanceOf(ExecutionException.class)
-                .hasCauseInstanceOf(IllegalStateException.class);
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to