This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e107a3b2dc4a41fc3abf5b124eab2fa28095ecfa
Author: RĂ©mi KOWALSKI <[email protected]>
AuthorDate: Mon Jan 27 15:31:54 2020 +0100

    JAMES-3028 add retry for reactor with async callback
---
 server/container/util/pom.xml                      |   9 +
 .../java/reactor/retry/RetryWithAsyncCallback.java | 270 ++++++++++++++
 .../test/java/reactor/retry/RetryTestUtils.java    | 122 +++++++
 .../reactor/retry/RetryWithAsyncCallbackTest.java  | 391 +++++++++++++++++++++
 4 files changed, 792 insertions(+)

diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml
index ab545bb..88876ea 100644
--- a/server/container/util/pom.xml
+++ b/server/container/util/pom.xml
@@ -72,6 +72,15 @@
             <artifactId>reactor-core</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor.addons</groupId>
+            <artifactId>reactor-extra</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>
diff --git 
a/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java 
b/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
new file mode 100644
index 0000000..33220bc
--- /dev/null
+++ 
b/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
@@ -0,0 +1,270 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package reactor.retry;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.util.Logger;
+import reactor.util.Loggers;
+
+/**
+ * This class is a copy of reactor.retry.DefaultRetry.
+ * Its goal is to provide a way to execute an async action before retrying.
+ * To do so it provides a retryWithMono method which is the async equivalent 
of the synchronous method doOnRetry.
+ *
+ * This is a temporary solution as this new requirement has been exposed in an 
issue in the reactor project.
+ * see : https://github.com/reactor/reactor-addons/issues/220
+ *
+ */
+public class RetryWithAsyncCallback<T> extends AbstractRetry<T, Throwable> 
implements Retry<T> {
+
+    static final Logger log = Loggers.getLogger(RetryWithAsyncCallback.class);
+    static final Consumer<? super RetryContext<?>> NOOP_ON_RETRY = r -> { };
+    static final Function<? super RetryContext<?>, Mono<?>> NOOP_ON_RETRY_MONO 
= r -> Mono.empty();
+
+    /**
+     * Returns a retry function that retries any exception, once.
+     * More constraints may be added using {@link #retryMax(long)} or {@link 
#timeout(Duration)}.
+     *
+     * @return retry function that retries on any exception
+     */
+    public static <T> RetryWithAsyncCallback<T> any() {
+        return RetryWithAsyncCallback.<T>create(context -> true);
+    }
+
+    /**
+     * Returns a retry function that retries errors resulting from any of the
+     * specified exceptions, once.
+     * More constraints may be added using {@link #retryMax(long)}
+     * or {@link #timeout(Duration)}.
+     *
+     * @param retriableExceptions Exceptions that may be retried
+     * @return retry function that retries indefinitely, only for specified 
exceptions
+     */
+    @SafeVarargs
+    public static <T> RetryWithAsyncCallback<T> anyOf(Class<? extends 
Throwable>... retriableExceptions) {
+        Predicate<? super RetryContext<T>> predicate = context -> {
+            Throwable exception = context.exception();
+            if (exception == null) {
+                return true;
+            }
+            for (Class<? extends Throwable> clazz : retriableExceptions) {
+                if (clazz.isInstance(exception)) {
+                    return true;
+                }
+            }
+            return false;
+        };
+        return RetryWithAsyncCallback.<T>create(predicate);
+    }
+
+    /**
+     * Returns a retry function that retries errors resulting from all 
exceptions except
+     * the specified non-retriable exceptions, once.
+     * More constraints may be added using
+     * {@link #retryMax(long)} or {@link #timeout(Duration)}.
+     *
+     * @param nonRetriableExceptions exceptions that may not be retried
+     * @return retry function that retries all exceptions except the specified 
non-retriable exceptions.
+     */
+    @SafeVarargs
+    public static <T> RetryWithAsyncCallback<T> allBut(final Class<? extends 
Throwable>... nonRetriableExceptions) {
+        Predicate<? super RetryContext<T>> predicate = context -> {
+            Throwable exception = context.exception();
+            if (exception == null) {
+                return true;
+            }
+            for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
+                if (clazz.isInstance(exception)) {
+                    return false;
+                }
+            }
+            return true;
+        };
+        return RetryWithAsyncCallback.<T>create(predicate);
+    }
+
+    /**
+     * Retry function that retries only if the predicate returns true, with no 
limit to
+     * the number of attempts.
+     * @param predicate Predicate that determines if next retry is performed
+     * @return Retry function with predicate
+     */
+    public static <T> RetryWithAsyncCallback<T> onlyIf(Predicate<? super 
RetryContext<T>> predicate) {
+        return 
RetryWithAsyncCallback.create(predicate).retryMax(Long.MAX_VALUE);
+    }
+
+    public static <T> RetryWithAsyncCallback<T> create(Predicate<? super 
RetryContext<T>> retryPredicate) {
+        return new RetryWithAsyncCallback<T>(retryPredicate,
+            Long.MAX_VALUE,
+            null,
+            Backoff.zero(),
+            Jitter.noJitter(),
+            null,
+            NOOP_ON_RETRY,
+            NOOP_ON_RETRY_MONO,
+            (T) null);
+    }
+
+    final Predicate<? super RetryContext<T>> retryPredicate;
+    final Consumer<? super RetryContext<T>> onRetry;
+    final Function<? super RetryContext<T>, Mono<?>> onRetryMono;
+
+    RetryWithAsyncCallback(Predicate<? super RetryContext<T>> retryPredicate,
+                           long maxIterations,
+                           Duration timeout,
+                           Backoff backoff,
+                           Jitter jitter,
+                           Scheduler backoffScheduler,
+                           final Consumer<? super RetryContext<T>> onRetry,
+                           Function<? super RetryContext<T>, Mono<?>> 
onRetryMono,
+                           T applicationContext) {
+        super(maxIterations, timeout, backoff, jitter, backoffScheduler, 
applicationContext);
+        this.retryPredicate = retryPredicate;
+        this.onRetry = onRetry;
+        this.onRetryMono = onRetryMono;
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> fixedBackoff(Duration backoffInterval) {
+        return backoff(Backoff.fixed(backoffInterval));
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> noBackoff() {
+        return backoff(Backoff.zero());
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> exponentialBackoff(Duration firstBackoff, 
Duration maxBackoff) {
+        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, 
false));
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> exponentialBackoffWithJitter(Duration 
firstBackoff, Duration maxBackoff) {
+        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, 
false)).jitter(Jitter.random());
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> randomBackoff(Duration firstBackoff, 
Duration maxBackoff) {
+        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 3, 
true)).jitter(Jitter.random());
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> withApplicationContext(T 
applicationContext) {
+        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, 
timeout,
+            backoff, jitter, backoffScheduler, onRetry, onRetryMono, 
applicationContext);
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> doOnRetry(Consumer<? super 
RetryContext<T>> onRetry) {
+        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, 
timeout,
+            backoff, jitter, backoffScheduler, onRetry, onRetryMono, 
applicationContext);
+    }
+
+    public RetryWithAsyncCallback<T> onRetryWithMono(Function<? super 
RetryContext<T>, Mono<?>> onRetryMono) {
+        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, 
timeout,
+            backoff, jitter, backoffScheduler, onRetry, onRetryMono, 
applicationContext);
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> retryOnce() {
+        return retryMax(1);
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> retryMax(long maxIterations) {
+        if (maxIterations < 0) {
+            throw new IllegalArgumentException("maxIterations should be >= 0");
+        }
+        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, 
timeout,
+            backoff, jitter, backoffScheduler, onRetry, onRetryMono, 
applicationContext);
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> timeout(Duration timeout) {
+        if (timeout.isNegative()) {
+            throw new IllegalArgumentException("timeout should be >= 0");
+        }
+        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, 
timeout,
+            backoff, jitter, backoffScheduler, onRetry, onRetryMono, 
applicationContext);
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> backoff(Backoff backoff) {
+        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, 
timeout,
+            backoff, jitter, backoffScheduler, onRetry, onRetryMono, 
applicationContext);
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> jitter(Jitter jitter) {
+        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, 
timeout,
+            backoff, jitter, backoffScheduler, onRetry, onRetryMono, 
applicationContext);
+    }
+
+    @Override
+    public RetryWithAsyncCallback<T> withBackoffScheduler(Scheduler scheduler) 
{
+        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, 
timeout,
+            backoff, jitter, scheduler, onRetry, onRetryMono, 
applicationContext);
+    }
+
+    @Override
+    public Publisher<Long> apply(Flux<Throwable> errors) {
+        Instant timeoutInstant = calculateTimeout();
+        DefaultContext<T> context = new DefaultContext<>(applicationContext, 
0L, null, null);
+        return errors.index()
+            .concatMap(tuple -> retry(tuple.getT2(), tuple.getT1() + 1L, 
timeoutInstant, context));
+    }
+
+    Publisher<Long> retry(Throwable e, long iteration, Instant timeoutInstant, 
DefaultContext<T> context) {
+        DefaultContext<T> tmpContext = new 
DefaultContext<>(applicationContext, iteration, context.lastBackoff, e);
+        BackoffDelay nextBackoff = calculateBackoff(tmpContext, 
timeoutInstant);
+        DefaultContext<T> retryContext = new 
DefaultContext<T>(applicationContext, iteration, nextBackoff, e);
+        context.lastBackoff = nextBackoff;
+
+        if (!retryPredicate.test(retryContext)) {
+            log.debug("Stopping retries since predicate returned false, retry 
context: {}", retryContext);
+            return Mono.error(e);
+        } else if (nextBackoff == RETRY_EXHAUSTED) {
+            log.debug("Retries exhausted, retry context: {}", retryContext);
+            return Mono.error(new RetryExhaustedException(e));
+        } else {
+            log.debug("Scheduling retry attempt, retry context: {}", 
retryContext);
+            onRetry.accept(retryContext);
+            return onRetryMono.apply(retryContext)
+                .then(Mono.from(retryMono(nextBackoff.delay())));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Retry{max=" + this.maxIterations + ",backoff=" + backoff + 
",jitter=" +
+            jitter + "}";
+    }
+}
diff --git 
a/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java 
b/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
new file mode 100644
index 0000000..075dcd9
--- /dev/null
+++ b/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
@@ -0,0 +1,122 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package reactor.retry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+
+public class RetryTestUtils {
+
+       static void assertDelays(Queue<? extends IterationContext<?>> retries, 
Long... delayMs) {
+               assertEquals(delayMs.length, retries.size());
+               int index = 0;
+               for (Iterator<? extends IterationContext<?>> it = 
retries.iterator(); it.hasNext(); ) {
+                       IterationContext<?> repeatContext = it.next();
+                       assertEquals(delayMs[index].longValue(), 
repeatContext.backoff().toMillis());
+                       index++;
+               }
+       }
+
+       static void assertRandomDelays(Queue<? extends IterationContext<?>> 
retries, int firstMs, int maxMs) {
+               long prevMs = 0;
+               int randomValues = 0;
+               for (IterationContext<?> context : retries) {
+                       long backoffMs = context.backoff().toMillis();
+                       assertTrue("Unexpected delay " + backoffMs, backoffMs 
>= firstMs && backoffMs <= maxMs);
+                       if (backoffMs != firstMs && backoffMs != prevMs)
+                               randomValues++;
+                       prevMs = backoffMs;
+               }
+               assertTrue("Delays not random", randomValues >= 2); // Allow 
for at most one edge case.
+       }
+
+       static <T> void testReuseInParallel(int threads, int iterations,
+                       Function<Backoff, Function<Flux<T>, Publisher<Long>>> 
retryOrRepeat,
+                       Consumer<Function<Flux<T>, Publisher<Long>>> testTask) 
throws Exception {
+               int repeatCount = iterations - 1;
+               AtomicInteger nextBackoff = new AtomicInteger();
+               // Keep track of the number of backoff invocations per instance
+               ConcurrentHashMap<Long, Integer> backoffCounts = new 
ConcurrentHashMap<>();
+               // Use a countdown latch to get all instances to stop in the 
first backoff callback
+               CountDownLatch latch = new CountDownLatch(threads);
+               Backoff customBackoff = context -> {
+                       Duration backoff = context.backoff();
+                       if (latch.getCount() > 0) {
+                               assertNull("Wrong context, backoff must be 
null", backoff);
+                               backoff = 
Duration.ofMillis(nextBackoff.incrementAndGet());
+                               backoffCounts.put(backoff.toMillis(), 1);
+                               latch.countDown();
+                               try {
+                                       latch.await(10, TimeUnit.SECONDS);
+                               }
+                               catch (Exception e) {
+                                       // ignore, errors are handled later
+                               }
+                       } else {
+                               assertNotNull("Wrong context, backoff must not 
be null", backoff);
+                               long index = backoff.toMillis();
+                               backoffCounts.put(index, 
backoffCounts.get(index) + 1);
+                       }
+                       return new BackoffDelay(backoff);
+               };
+               Function<Flux<T>, Publisher<Long>> retryFunc = 
retryOrRepeat.apply(customBackoff);
+               ExecutorService executor = 
Executors.newFixedThreadPool(threads);
+               List<Future<?>> futures = new ArrayList<>();
+               try {
+                       for (int i = 0; i < threads; i++) {
+                               Runnable runnable = () -> 
testTask.accept(retryFunc);
+                               futures.add(executor.submit(runnable));
+                       }
+                       for (Future<?> future : futures)
+                               future.get(5, TimeUnit.SECONDS);
+               }
+               finally {
+                       executor.shutdownNow();
+               }
+
+               assertEquals(0, latch.getCount());
+               assertEquals(threads, backoffCounts.size());
+               for (Integer count : backoffCounts.values()) {
+                       //backoff not invoked anymore when maxIteration reached
+                       assertEquals(repeatCount, count.intValue());
+               }
+       }
+}
diff --git 
a/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
 
b/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
new file mode 100644
index 0000000..2f2aad5
--- /dev/null
+++ 
b/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
@@ -0,0 +1,391 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package reactor.retry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
+
+public class RetryWithAsyncCallbackTest {
+
+    private Queue<RetryContext<?>> retries = new ConcurrentLinkedQueue<>();
+
+    @Test
+    public void shouldTimeoutRetryWithVirtualTime() {
+        // given
+        final int minBackoff = 1;
+        final int maxBackoff = 5;
+        final int timeout = 10;
+
+        // then
+        StepVerifier.withVirtualTime(() ->
+            Mono.<String>error(new RuntimeException("Something went wrong"))
+                .retryWhen(RetryWithAsyncCallback.anyOf(Exception.class)
+                    
.exponentialBackoffWithJitter(Duration.ofSeconds(minBackoff), 
Duration.ofSeconds(maxBackoff))
+                    .timeout(Duration.ofSeconds(timeout)))
+                .subscribeOn(Schedulers.elastic()))
+            .expectSubscription()
+//                             .expectNoEvent(Duration.ofSeconds(timeout))
+            .thenAwait(Duration.ofSeconds(timeout))
+            .expectError(RetryExhaustedException.class)
+            .verify(Duration.ofSeconds(timeout));
+    }
+
+    @Test
+    public void fluxRetryNoBackoff() {
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
IOException()))
+            
.retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry()));
+
+        StepVerifier.create(flux)
+            .expectNext(0, 1, 0, 1, 0, 1)
+            .verifyError(RetryExhaustedException.class);
+        assertRetries(IOException.class, IOException.class);
+        RetryTestUtils.assertDelays(retries, 0L, 0L);
+    }
+
+    @Test
+    public void monoRetryNoBackoff() {
+        Mono<?> mono = Mono.error(new IOException())
+            
.retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry()));
+
+        StepVerifier.create(mono)
+            .verifyError(RetryExhaustedException.class);
+        assertRetries(IOException.class, IOException.class);
+        RetryTestUtils.assertDelays(retries, 0L, 0L);
+    }
+
+    @Test
+    public void fluxRetryFixedBackoff() {
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
IOException()))
+            
.retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry()));
+
+        StepVerifier.withVirtualTime(() -> flux)
+            .expectNext(0, 1)
+            .expectNoEvent(Duration.ofMillis(300))
+            .thenAwait(Duration.ofMillis(300))
+            .expectNext(0, 1)
+            .verifyError(RetryExhaustedException.class);
+        assertRetries(IOException.class);
+        RetryTestUtils.assertDelays(retries, 500L);
+    }
+
+    @Test
+    public void monoRetryFixedBackoff() {
+        Mono<?> mono = Mono.error(new IOException())
+            
.retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry()));
+
+        StepVerifier.withVirtualTime(() -> mono)
+            .expectSubscription()
+            .expectNoEvent(Duration.ofMillis(300))
+            .thenAwait(Duration.ofMillis(300))
+            .verifyError(RetryExhaustedException.class);
+
+        assertRetries(IOException.class);
+        RetryTestUtils.assertDelays(retries, 500L);
+    }
+
+
+    @Test
+    public void fluxRetryExponentialBackoff() {
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
IOException()))
+            .retryWhen(RetryWithAsyncCallback.any()
+                .exponentialBackoff(Duration.ofMillis(100), 
Duration.ofMillis(500))
+                .timeout(Duration.ofMillis(1500))
+                .doOnRetry(onRetry()));
+
+        StepVerifier.create(flux)
+            .expectNext(0, 1)
+            .expectNoEvent(Duration.ofMillis(50))  // delay=100
+            .expectNext(0, 1)
+            .expectNoEvent(Duration.ofMillis(150)) // delay=200
+            .expectNext(0, 1)
+            .expectNoEvent(Duration.ofMillis(250)) // delay=400
+            .expectNext(0, 1)
+            .expectNoEvent(Duration.ofMillis(450)) // delay=500
+            .expectNext(0, 1)
+            .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class));
+
+        assertRetries(IOException.class, IOException.class, IOException.class, 
IOException.class);
+        RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L);
+    }
+    @Test
+    public void monoRetryExponentialBackoff() {
+        Mono<?> mono = Mono.error(new IOException())
+            .retryWhen(RetryWithAsyncCallback.any()
+                .exponentialBackoff(Duration.ofMillis(100), 
Duration.ofMillis(500))
+                .retryMax(4)
+                .doOnRetry(onRetry()));
+
+        StepVerifier.withVirtualTime(() -> mono)
+            .expectSubscription()
+            .thenAwait(Duration.ofMillis(100))
+            .thenAwait(Duration.ofMillis(200))
+            .thenAwait(Duration.ofMillis(400))
+            .thenAwait(Duration.ofMillis(500))
+            .verifyError(RetryExhaustedException.class);
+
+        assertRetries(IOException.class, IOException.class, IOException.class, 
IOException.class);
+        RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L);
+    }
+
+    @Test
+    public void fluxRetryRandomBackoff() {
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
IOException()))
+            .retryWhen(RetryWithAsyncCallback.any()
+                .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000))
+                .retryMax(4)
+                .doOnRetry(onRetry()));
+
+        StepVerifier.create(flux)
+            .expectNext(0, 1, 0, 1, 0, 1, 0, 1, 0, 1)
+            .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class));
+
+        assertRetries(IOException.class, IOException.class, IOException.class, 
IOException.class);
+        RetryTestUtils.assertRandomDelays(retries, 100, 2000);
+    }
+
+    @Test
+    public void monoRetryRandomBackoff() {
+        Mono<?> mono = Mono.error(new IOException())
+            .retryWhen(RetryWithAsyncCallback.any()
+                .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000))
+                .retryMax(4)
+                .doOnRetry(onRetry()));
+
+        StepVerifier.withVirtualTime(() -> mono)
+            .expectSubscription()
+            .thenAwait(Duration.ofMillis(100))
+            .thenAwait(Duration.ofMillis(2000))
+            .thenAwait(Duration.ofMillis(2000))
+            .thenAwait(Duration.ofMillis(2000))
+            .verifyError(RetryExhaustedException.class);
+
+        assertRetries(IOException.class, IOException.class, IOException.class, 
IOException.class);
+        RetryTestUtils.assertRandomDelays(retries, 100, 2000);
+    }
+
+
+    @Test
+    public void fluxRetriableExceptions() {
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
SocketException()))
+            
.retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry()));
+
+        StepVerifier.create(flux)
+            .expectNext(0, 1, 0, 1)
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
SocketException.class));
+
+        Flux<Integer> nonRetriable = Flux.concat(Flux.range(0, 2), 
Flux.error(new RuntimeException()))
+            
.retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry()));
+        StepVerifier.create(nonRetriable)
+            .expectNext(0, 1)
+            .verifyError(RuntimeException.class);
+
+    }
+
+    @Test
+    public void fluxNonRetriableExceptions() {
+
+        Retry<?> retry = 
RetryWithAsyncCallback.allBut(RuntimeException.class).retryOnce().doOnRetry(onRetry());
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
IllegalStateException())).retryWhen(retry);
+
+        StepVerifier.create(flux)
+            .expectNext(0, 1)
+            .verifyError(IllegalStateException.class);
+
+
+        Flux<Integer> retriable = Flux.concat(Flux.range(0, 2), Flux.error(new 
SocketException())).retryWhen(retry);
+        StepVerifier.create(retriable)
+            .expectNext(0, 1, 0, 1)
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
SocketException.class));
+    }
+
+    @Test
+    public void fluxRetryAnyException() {
+        Retry<?> retry = 
RetryWithAsyncCallback.any().retryOnce().doOnRetry(onRetry());
+
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
SocketException())).retryWhen(retry);
+        StepVerifier.create(flux)
+            .expectNext(0, 1, 0, 1)
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
SocketException.class));
+
+        Flux<Integer> flux2 = Flux.concat(Flux.range(0, 2), Flux.error(new 
RuntimeException())).retryWhen(retry);
+        StepVerifier.create(flux2)
+            .expectNext(0, 1, 0, 1)
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
RuntimeException.class));
+
+    }
+
+    @Test
+    public void fluxRetryOnPredicate() {
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
SocketException()))
+            .retryWhen(RetryWithAsyncCallback.onlyIf(context -> 
context.iteration() < 3).doOnRetry(onRetry()));
+
+        StepVerifier.create(flux)
+            .expectNext(0, 1, 0, 1, 0, 1)
+            .verifyError(SocketException.class);
+    }
+
+
+    @Test
+    public void doOnRetry() {
+        Semaphore semaphore = new Semaphore(0);
+        Retry<?> retry = RetryWithAsyncCallback.any()
+            .retryOnce()
+            .fixedBackoff(Duration.ofMillis(500))
+            .doOnRetry(context -> semaphore.release());
+
+        StepVerifier.withVirtualTime(() -> Flux.range(0, 
2).concatWith(Mono.error(new SocketException())).retryWhen(retry))
+            .expectNext(0, 1)
+            .then(semaphore::acquireUninterruptibly)
+            .expectNoEvent(Duration.ofMillis(400))
+            .thenAwait(Duration.ofMillis(200))
+            .expectNext(0, 1)
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
SocketException.class));
+
+        StepVerifier.withVirtualTime(() -> Mono.error(new 
SocketException()).retryWhen(retry.noBackoff()))
+            .then(semaphore::acquireUninterruptibly)
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
SocketException.class));
+    }
+
+    @Test
+    public void onRetryWithMono() {
+        Semaphore semaphore = new Semaphore(0);
+        Retry<?> retry = RetryWithAsyncCallback.any()
+            .retryOnce()
+            .fixedBackoff(Duration.ofMillis(500))
+            .onRetryWithMono(context -> Mono.fromCallable(() -> { 
semaphore.release(); return 0; }));
+
+        StepVerifier.withVirtualTime(() -> Flux.range(0, 
2).concatWith(Mono.error(new SocketException())).retryWhen(retry))
+            .expectNext(0, 1)
+            .then(semaphore::acquireUninterruptibly)
+            .expectNoEvent(Duration.ofMillis(400))
+            .thenAwait(Duration.ofMillis(200))
+            .expectNext(0, 1)
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
SocketException.class));
+
+        StepVerifier.withVirtualTime(() -> Mono.error(new 
SocketException()).retryWhen(retry.noBackoff()))
+            .then(semaphore::acquireUninterruptibly)
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
SocketException.class));
+    }
+
+    @Test
+    public void retryApplicationContext() {
+        class AppContext {
+            boolean needsRollback;
+            void rollback() {
+                needsRollback = false;
+            }
+            void run() {
+                assertFalse("Rollback not performed", needsRollback);
+                needsRollback = true;
+            }
+        }
+        AppContext appContext = new AppContext();
+        Retry<?> retry = 
RetryWithAsyncCallback.<AppContext>any().withApplicationContext(appContext)
+            .retryMax(2)
+            .doOnRetry(context -> {
+                AppContext ac = context.applicationContext();
+                assertNotNull("Application context not propagated", ac);
+                ac.rollback();
+            });
+
+        StepVerifier.withVirtualTime(() -> Mono.error(new 
RuntimeException()).doOnNext(i -> appContext.run()).retryWhen(retry))
+            .verifyErrorMatches(e -> isRetryExhausted(e, 
RuntimeException.class));
+
+    }
+
+    @Test
+    public void fluxRetryCompose() {
+        Retry<?> retry = 
RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry());
+        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new 
IOException())).as(retry::apply);
+
+        StepVerifier.create(flux)
+            .expectNext(0, 1, 0, 1, 0, 1)
+            .verifyError(RetryExhaustedException.class);
+        assertRetries(IOException.class, IOException.class);
+    }
+
+    @Test
+    public void monoRetryCompose() {
+        Retry<?> retry = 
RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry());
+        Flux<?> flux = Mono.error(new IOException()).as(retry::apply);
+
+        StepVerifier.create(flux)
+            .verifyError(RetryExhaustedException.class);
+        assertRetries(IOException.class, IOException.class);
+    }
+
+    @Test
+    public void functionReuseInParallel() throws Exception {
+        int retryCount = 19;
+        int range = 100;
+        Integer[] values = new Integer[(retryCount + 1) * range];
+        for (int i = 0; i <= retryCount; i++) {
+            for (int j = 1; j <= range; j++)
+                values[i * range + j - 1] = j;
+        }
+        RetryTestUtils.testReuseInParallel(2, 20,
+            backoff -> 
RetryWithAsyncCallback.<Integer>any().retryMax(19).backoff(backoff),
+            retryFunc -> StepVerifier.create(Flux.range(1, 
range).concatWith(Mono.error(new SocketException())).retryWhen(retryFunc))
+                .expectNext(values)
+                .verifyErrorMatches(e -> isRetryExhausted(e, 
SocketException.class)));
+    }
+
+    Consumer<? super RetryContext<?>> onRetry() {
+        return context -> retries.add(context);
+    }
+
+    @SafeVarargs
+    private final void assertRetries(Class<? extends Throwable>... exceptions) 
{
+        assertEquals(exceptions.length, retries.size());
+        int index = 0;
+        for (Iterator<RetryContext<?>> it = retries.iterator(); it.hasNext(); 
) {
+            RetryContext<?> retryContext = it.next();
+            assertEquals(index + 1, retryContext.iteration());
+            assertEquals(exceptions[index], 
retryContext.exception().getClass());
+            index++;
+        }
+    }
+
+    static boolean isRetryExhausted(Throwable e, Class<? extends Throwable> 
cause) {
+        return e instanceof RetryExhaustedException && 
cause.isInstance(e.getCause());
+    }
+
+    @Test
+    public void retryToString() {
+        
System.out.println(RetryWithAsyncCallback.any().noBackoff().retryMax(2).toString());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to