[FLINK-8758] Add FutureUtils.retrySuccessfulWithDelay()

This retries getting a result until it matches a given predicate or
until we run out of retries.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d2788e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d2788e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d2788e3

Branch: refs/heads/release-1.5
Commit: 5d2788e378aedc6ee5133f080e70878d06e1743d
Parents: c486125
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Tue Feb 27 13:40:51 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sun Mar 11 08:31:56 2018 -0700

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 76 ++++++++++++++++++++
 1 file changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d2788e3/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index da77bdc..a2d0710 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.ExceptionUtils;
@@ -224,6 +225,81 @@ public class FutureUtils {
        }
 
        /**
+        * Retry the given operation with the given delay in between successful 
completions where the
+        * result does not match a given predicate.
+        *
+        * @param operation to retry
+        * @param retryDelay delay between retries
+        * @param deadline A deadline that specifies at what point we should 
stop retrying
+        * @param acceptancePredicate Predicate to test whether the result is 
acceptable
+        * @param scheduledExecutor executor to be used for the retry operation
+        * @param <T> type of the result
+        * @return Future which retries the given operation a given amount of 
times and delays the retry
+        *   in case the predicate isn't matched
+        */
+       public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
+               final Supplier<CompletableFuture<T>> operation,
+               final Time retryDelay,
+               final Deadline deadline,
+               final Predicate<T> acceptancePredicate,
+               final ScheduledExecutor scheduledExecutor) {
+
+               final CompletableFuture<T> resultFuture = new 
CompletableFuture<>();
+
+               retrySuccessfulOperationWithDelay(
+                       resultFuture,
+                       operation,
+                       retryDelay,
+                       deadline,
+                       acceptancePredicate,
+                       scheduledExecutor);
+
+               return resultFuture;
+       }
+
+       private static <T> void retrySuccessfulOperationWithDelay(
+               final CompletableFuture<T> resultFuture,
+               final Supplier<CompletableFuture<T>> operation,
+               final Time retryDelay,
+               final Deadline deadline,
+               final Predicate<T> acceptancePredicate,
+               final ScheduledExecutor scheduledExecutor) {
+
+               if (!resultFuture.isDone()) {
+                       final CompletableFuture<T> operationResultFuture = 
operation.get();
+
+                       operationResultFuture.whenComplete(
+                               (t, throwable) -> {
+                                       if (throwable != null) {
+                                               if (throwable instanceof 
CancellationException) {
+                                                       
resultFuture.completeExceptionally(new RetryException("Operation future was 
cancelled.", throwable));
+                                               } else {
+                                                       
resultFuture.completeExceptionally(throwable);
+                                               }
+                                       } else {
+                                               if 
(acceptancePredicate.test(t)) {
+                                                       
resultFuture.complete(t);
+                                               } else if 
(deadline.hasTimeLeft()) {
+                                                       final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+                                                               () -> 
retrySuccessfulOperationWithDelay(resultFuture, operation, retryDelay, 
deadline, acceptancePredicate, scheduledExecutor),
+                                                               
retryDelay.toMilliseconds(),
+                                                               
TimeUnit.MILLISECONDS);
+
+                                                       
resultFuture.whenComplete(
+                                                               (innerT, 
innerThrowable) -> scheduledFuture.cancel(false));
+                                               } else {
+                                                       
resultFuture.completeExceptionally(
+                                                               new 
RetryException("Could not satisfy the predicate within the allowed time."));
+                                               }
+                                       }
+                               });
+
+                       resultFuture.whenComplete(
+                               (t, throwable) -> 
operationResultFuture.cancel(false));
+               }
+       }
+
+       /**
         * Exception with which the returned future is completed if the {@link 
#retry(Supplier, int, Executor)}
         * operation fails.
         */

Reply via email to