[FLINK-8758] Add FutureUtils.retrySuccessfulWithDelay() This retries getting a result until it matches a given predicate or until we run out of retries.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d2788e3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d2788e3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d2788e3 Branch: refs/heads/release-1.5 Commit: 5d2788e378aedc6ee5133f080e70878d06e1743d Parents: c486125 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue Feb 27 13:40:51 2018 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Sun Mar 11 08:31:56 2018 -0700 ---------------------------------------------------------------------- .../flink/runtime/concurrent/FutureUtils.java | 76 ++++++++++++++++++++ 1 file changed, 76 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5d2788e3/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index da77bdc..a2d0710 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.concurrent; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.ExceptionUtils; @@ -224,6 +225,81 @@ public class FutureUtils { } /** + * Retry the given operation with the given delay in between successful completions where the + * result does not match a given predicate. + * + * @param operation to retry + * @param retryDelay delay between retries + * @param deadline A deadline that specifies at what point we should stop retrying + * @param acceptancePredicate Predicate to test whether the result is acceptable + * @param scheduledExecutor executor to be used for the retry operation + * @param <T> type of the result + * @return Future which retries the given operation a given amount of times and delays the retry + * in case the predicate isn't matched + */ + public static <T> CompletableFuture<T> retrySuccesfulWithDelay( + final Supplier<CompletableFuture<T>> operation, + final Time retryDelay, + final Deadline deadline, + final Predicate<T> acceptancePredicate, + final ScheduledExecutor scheduledExecutor) { + + final CompletableFuture<T> resultFuture = new CompletableFuture<>(); + + retrySuccessfulOperationWithDelay( + resultFuture, + operation, + retryDelay, + deadline, + acceptancePredicate, + scheduledExecutor); + + return resultFuture; + } + + private static <T> void retrySuccessfulOperationWithDelay( + final CompletableFuture<T> resultFuture, + final Supplier<CompletableFuture<T>> operation, + final Time retryDelay, + final Deadline deadline, + final Predicate<T> acceptancePredicate, + final ScheduledExecutor scheduledExecutor) { + + if (!resultFuture.isDone()) { + final CompletableFuture<T> operationResultFuture = operation.get(); + + operationResultFuture.whenComplete( + (t, throwable) -> { + if (throwable != null) { + if (throwable instanceof CancellationException) { + resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable)); + } else { + resultFuture.completeExceptionally(throwable); + } + } else { + if (acceptancePredicate.test(t)) { + resultFuture.complete(t); + } else if (deadline.hasTimeLeft()) { + final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule( + () -> retrySuccessfulOperationWithDelay(resultFuture, operation, retryDelay, deadline, acceptancePredicate, scheduledExecutor), + retryDelay.toMilliseconds(), + TimeUnit.MILLISECONDS); + + resultFuture.whenComplete( + (innerT, innerThrowable) -> scheduledFuture.cancel(false)); + } else { + resultFuture.completeExceptionally( + new RetryException("Could not satisfy the predicate within the allowed time.")); + } + } + }); + + resultFuture.whenComplete( + (t, throwable) -> operationResultFuture.cancel(false)); + } + } + + /** * Exception with which the returned future is completed if the {@link #retry(Supplier, int, Executor)} * operation fails. */