Repository: james-project
Updated Branches:
  refs/heads/master fbb8b42b8 -> f3140967a


JAMES-1945 Async retrier for Cassandra


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9200405d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9200405d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9200405d

Branch: refs/heads/master
Commit: 9200405d39805a0494d4e366545817f958af5cf2
Parents: fbb8b42
Author: benwa <[email protected]>
Authored: Wed Feb 22 14:26:32 2017 +0700
Committer: benwa <[email protected]>
Committed: Thu Feb 23 10:37:37 2017 +0700

----------------------------------------------------------------------
 .../utils/FunctionRunnerWithRetry.java          | 25 +++++-
 .../utils/FunctionRunnerWithRetryTest.java      | 89 ++++++++++++++++++++
 2 files changed, 112 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/9200405d/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
index 314b81d..86e1aba 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
@@ -19,14 +19,18 @@
 
 package org.apache.james.backends.cassandra.utils;
 
-import com.google.common.base.Preconditions;
-
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
+import com.google.common.base.Preconditions;
+
 public class FunctionRunnerWithRetry {
 
+    public static class RelayingException extends RuntimeException {}
+
     @FunctionalInterface
     public interface OptionalSupplier<T> {
         Optional<T> getAsOptional();
@@ -54,4 +58,21 @@ public class FunctionRunnerWithRetry {
             .orElseThrow(() -> new LightweightTransactionException(maxRetry))
             .get();
     }
+
+    public <T> CompletableFuture<Optional<T>> 
executeAsyncAndRetrieveObject(Supplier<CompletableFuture<Optional<T>>> 
futureSupplier) {
+        return executeAsyncAndRetrieveObject(futureSupplier, 0);
+    }
+
+    public <T> CompletableFuture<Optional<T>> 
executeAsyncAndRetrieveObject(Supplier<CompletableFuture<Optional<T>>> 
futureSupplier, int tries) {
+        if (tries >= maxRetry) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+        return futureSupplier.get()
+            .thenCompose(optional -> {
+                if (optional.isPresent()) {
+                    return CompletableFuture.completedFuture(optional);
+                }
+                return executeAsyncAndRetrieveObject(futureSupplier, tries + 
1);
+            });
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/9200405d/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
index 2f85653..a2ae9a4 100644
--- 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
@@ -21,6 +21,10 @@ package org.apache.james.backends.cassandra.utils;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.lang.mutable.MutableInt;
 import org.junit.Test;
 
@@ -80,5 +84,90 @@ public class FunctionRunnerWithRetryTest {
         );
         assertThat(value.getValue()).isEqualTo(MAX_RETRY);
     }
+
+    @Test
+    public void asyncFunctionRunnerShouldWorkIfSucceedFirstTry() throws 
Exception {
+        int value = 18;
+
+        Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
+            .executeAsyncAndRetrieveObject(
+                () -> CompletableFuture.completedFuture(Optional.of(value)))
+            .join();
+
+        assertThat(result).contains(value);
+    }
+
+    @Test
+    public void asyncFunctionRunnerShouldTryOnlyOnceIfSuccess() throws 
Exception {
+        int value = 18;
+        AtomicInteger times = new AtomicInteger(0);
+
+        new FunctionRunnerWithRetry(MAX_RETRY)
+            .executeAsyncAndRetrieveObject(
+                () -> {
+                    times.incrementAndGet();
+                    return 
CompletableFuture.completedFuture(Optional.of(value));
+                })
+            .join();
+
+        assertThat(times.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void asyncFunctionRunnerShouldRetrieveValueOnRetry() throws 
Exception {
+        int value = 18;
+        AtomicInteger times = new AtomicInteger(0);
+
+        Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
+            .executeAsyncAndRetrieveObject(
+                () -> {
+                    int attemptCount = times.incrementAndGet();
+                    if (attemptCount == MAX_RETRY) {
+                        return 
CompletableFuture.completedFuture(Optional.of(value));
+                    } else {
+                        return 
CompletableFuture.completedFuture(Optional.empty());
+                    }
+                })
+            .join();
+
+        assertThat(result).contains(value);
+    }
+
+    @Test
+    public void asyncFunctionRunnerShouldMakeMaxRetryAttempts() throws 
Exception {
+        int value = 18;
+        AtomicInteger times = new AtomicInteger(0);
+
+        new FunctionRunnerWithRetry(MAX_RETRY)
+            .executeAsyncAndRetrieveObject(
+                () -> {
+                    int attemptCount = times.incrementAndGet();
+                    if (attemptCount == MAX_RETRY) {
+                        return 
CompletableFuture.completedFuture(Optional.of(value));
+                    } else {
+                        return 
CompletableFuture.completedFuture(Optional.empty());
+                    }
+                })
+            .join();
+
+        assertThat(times.get()).isEqualTo(MAX_RETRY);
+    }
+
+
+    @Test
+    public void asyncFunctionRunnerShouldReturnEmptyIfAllFailed() throws 
Exception {
+        AtomicInteger times = new AtomicInteger(0);
+
+        Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
+            .executeAsyncAndRetrieveObject(
+                () -> {
+                    times.incrementAndGet();
+                    return 
CompletableFuture.completedFuture(Optional.<Integer>empty());
+                })
+            .join();
+
+        assertThat(result).isEmpty();
+        assertThat(times.get()).isEqualTo(MAX_RETRY);
+    }
     
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to