This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e107a3b2dc4a41fc3abf5b124eab2fa28095ecfa Author: Rémi KOWALSKI <[email protected]> AuthorDate: Mon Jan 27 15:31:54 2020 +0100 JAMES-3028 add retry for reactor with async callback --- server/container/util/pom.xml | 9 + .../java/reactor/retry/RetryWithAsyncCallback.java | 270 ++++++++++++++ .../test/java/reactor/retry/RetryTestUtils.java | 122 +++++++ .../reactor/retry/RetryWithAsyncCallbackTest.java | 391 +++++++++++++++++++++ 4 files changed, 792 insertions(+) diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml index ab545bb..88876ea 100644 --- a/server/container/util/pom.xml +++ b/server/container/util/pom.xml @@ -72,6 +72,15 @@ <artifactId>reactor-core</artifactId> </dependency> <dependency> + <groupId>io.projectreactor.addons</groupId> + <artifactId>reactor-extra</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> diff --git a/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java b/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java new file mode 100644 index 0000000..33220bc --- /dev/null +++ b/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java @@ -0,0 +1,270 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package reactor.retry; + +import java.time.Duration; +import java.time.Instant; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.util.Logger; +import reactor.util.Loggers; + +/** + * This class is a copy of reactor.retry.DefaultRetry. + * Its goal is to provide a way to execute an async action before retrying. + * To do so it provides a retryWithMono method which is the async equivalent of the synchronous method doOnRetry. + * + * This is a temporary solution as this new requirement has been exposed in an issue in the reactor project. + * see : https://github.com/reactor/reactor-addons/issues/220 + * + */ +public class RetryWithAsyncCallback<T> extends AbstractRetry<T, Throwable> implements Retry<T> { + + static final Logger log = Loggers.getLogger(RetryWithAsyncCallback.class); + static final Consumer<? super RetryContext<?>> NOOP_ON_RETRY = r -> { }; + static final Function<? super RetryContext<?>, Mono<?>> NOOP_ON_RETRY_MONO = r -> Mono.empty(); + + /** + * Returns a retry function that retries any exception, once. + * More constraints may be added using {@link #retryMax(long)} or {@link #timeout(Duration)}. + * + * @return retry function that retries on any exception + */ + public static <T> RetryWithAsyncCallback<T> any() { + return RetryWithAsyncCallback.<T>create(context -> true); + } + + /** + * Returns a retry function that retries errors resulting from any of the + * specified exceptions, once. + * More constraints may be added using {@link #retryMax(long)} + * or {@link #timeout(Duration)}. + * + * @param retriableExceptions Exceptions that may be retried + * @return retry function that retries indefinitely, only for specified exceptions + */ + @SafeVarargs + public static <T> RetryWithAsyncCallback<T> anyOf(Class<? extends Throwable>... retriableExceptions) { + Predicate<? super RetryContext<T>> predicate = context -> { + Throwable exception = context.exception(); + if (exception == null) { + return true; + } + for (Class<? extends Throwable> clazz : retriableExceptions) { + if (clazz.isInstance(exception)) { + return true; + } + } + return false; + }; + return RetryWithAsyncCallback.<T>create(predicate); + } + + /** + * Returns a retry function that retries errors resulting from all exceptions except + * the specified non-retriable exceptions, once. + * More constraints may be added using + * {@link #retryMax(long)} or {@link #timeout(Duration)}. + * + * @param nonRetriableExceptions exceptions that may not be retried + * @return retry function that retries all exceptions except the specified non-retriable exceptions. + */ + @SafeVarargs + public static <T> RetryWithAsyncCallback<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) { + Predicate<? super RetryContext<T>> predicate = context -> { + Throwable exception = context.exception(); + if (exception == null) { + return true; + } + for (Class<? extends Throwable> clazz : nonRetriableExceptions) { + if (clazz.isInstance(exception)) { + return false; + } + } + return true; + }; + return RetryWithAsyncCallback.<T>create(predicate); + } + + /** + * Retry function that retries only if the predicate returns true, with no limit to + * the number of attempts. + * @param predicate Predicate that determines if next retry is performed + * @return Retry function with predicate + */ + public static <T> RetryWithAsyncCallback<T> onlyIf(Predicate<? super RetryContext<T>> predicate) { + return RetryWithAsyncCallback.create(predicate).retryMax(Long.MAX_VALUE); + } + + public static <T> RetryWithAsyncCallback<T> create(Predicate<? super RetryContext<T>> retryPredicate) { + return new RetryWithAsyncCallback<T>(retryPredicate, + Long.MAX_VALUE, + null, + Backoff.zero(), + Jitter.noJitter(), + null, + NOOP_ON_RETRY, + NOOP_ON_RETRY_MONO, + (T) null); + } + + final Predicate<? super RetryContext<T>> retryPredicate; + final Consumer<? super RetryContext<T>> onRetry; + final Function<? super RetryContext<T>, Mono<?>> onRetryMono; + + RetryWithAsyncCallback(Predicate<? super RetryContext<T>> retryPredicate, + long maxIterations, + Duration timeout, + Backoff backoff, + Jitter jitter, + Scheduler backoffScheduler, + final Consumer<? super RetryContext<T>> onRetry, + Function<? super RetryContext<T>, Mono<?>> onRetryMono, + T applicationContext) { + super(maxIterations, timeout, backoff, jitter, backoffScheduler, applicationContext); + this.retryPredicate = retryPredicate; + this.onRetry = onRetry; + this.onRetryMono = onRetryMono; + } + + @Override + public RetryWithAsyncCallback<T> fixedBackoff(Duration backoffInterval) { + return backoff(Backoff.fixed(backoffInterval)); + } + + @Override + public RetryWithAsyncCallback<T> noBackoff() { + return backoff(Backoff.zero()); + } + + @Override + public RetryWithAsyncCallback<T> exponentialBackoff(Duration firstBackoff, Duration maxBackoff) { + return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, false)); + } + + @Override + public RetryWithAsyncCallback<T> exponentialBackoffWithJitter(Duration firstBackoff, Duration maxBackoff) { + return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, false)).jitter(Jitter.random()); + } + + @Override + public RetryWithAsyncCallback<T> randomBackoff(Duration firstBackoff, Duration maxBackoff) { + return backoff(Backoff.exponential(firstBackoff, maxBackoff, 3, true)).jitter(Jitter.random()); + } + + @Override + public RetryWithAsyncCallback<T> withApplicationContext(T applicationContext) { + return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout, + backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext); + } + + @Override + public RetryWithAsyncCallback<T> doOnRetry(Consumer<? super RetryContext<T>> onRetry) { + return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout, + backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext); + } + + public RetryWithAsyncCallback<T> onRetryWithMono(Function<? super RetryContext<T>, Mono<?>> onRetryMono) { + return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout, + backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext); + } + + @Override + public RetryWithAsyncCallback<T> retryOnce() { + return retryMax(1); + } + + @Override + public RetryWithAsyncCallback<T> retryMax(long maxIterations) { + if (maxIterations < 0) { + throw new IllegalArgumentException("maxIterations should be >= 0"); + } + return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout, + backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext); + } + + @Override + public RetryWithAsyncCallback<T> timeout(Duration timeout) { + if (timeout.isNegative()) { + throw new IllegalArgumentException("timeout should be >= 0"); + } + return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout, + backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext); + } + + @Override + public RetryWithAsyncCallback<T> backoff(Backoff backoff) { + return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout, + backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext); + } + + @Override + public RetryWithAsyncCallback<T> jitter(Jitter jitter) { + return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout, + backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext); + } + + @Override + public RetryWithAsyncCallback<T> withBackoffScheduler(Scheduler scheduler) { + return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout, + backoff, jitter, scheduler, onRetry, onRetryMono, applicationContext); + } + + @Override + public Publisher<Long> apply(Flux<Throwable> errors) { + Instant timeoutInstant = calculateTimeout(); + DefaultContext<T> context = new DefaultContext<>(applicationContext, 0L, null, null); + return errors.index() + .concatMap(tuple -> retry(tuple.getT2(), tuple.getT1() + 1L, timeoutInstant, context)); + } + + Publisher<Long> retry(Throwable e, long iteration, Instant timeoutInstant, DefaultContext<T> context) { + DefaultContext<T> tmpContext = new DefaultContext<>(applicationContext, iteration, context.lastBackoff, e); + BackoffDelay nextBackoff = calculateBackoff(tmpContext, timeoutInstant); + DefaultContext<T> retryContext = new DefaultContext<T>(applicationContext, iteration, nextBackoff, e); + context.lastBackoff = nextBackoff; + + if (!retryPredicate.test(retryContext)) { + log.debug("Stopping retries since predicate returned false, retry context: {}", retryContext); + return Mono.error(e); + } else if (nextBackoff == RETRY_EXHAUSTED) { + log.debug("Retries exhausted, retry context: {}", retryContext); + return Mono.error(new RetryExhaustedException(e)); + } else { + log.debug("Scheduling retry attempt, retry context: {}", retryContext); + onRetry.accept(retryContext); + return onRetryMono.apply(retryContext) + .then(Mono.from(retryMono(nextBackoff.delay()))); + } + } + + @Override + public String toString() { + return "Retry{max=" + this.maxIterations + ",backoff=" + backoff + ",jitter=" + + jitter + "}"; + } +} diff --git a/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java b/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java new file mode 100644 index 0000000..075dcd9 --- /dev/null +++ b/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java @@ -0,0 +1,122 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package reactor.retry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Flux; + +public class RetryTestUtils { + + static void assertDelays(Queue<? extends IterationContext<?>> retries, Long... delayMs) { + assertEquals(delayMs.length, retries.size()); + int index = 0; + for (Iterator<? extends IterationContext<?>> it = retries.iterator(); it.hasNext(); ) { + IterationContext<?> repeatContext = it.next(); + assertEquals(delayMs[index].longValue(), repeatContext.backoff().toMillis()); + index++; + } + } + + static void assertRandomDelays(Queue<? extends IterationContext<?>> retries, int firstMs, int maxMs) { + long prevMs = 0; + int randomValues = 0; + for (IterationContext<?> context : retries) { + long backoffMs = context.backoff().toMillis(); + assertTrue("Unexpected delay " + backoffMs, backoffMs >= firstMs && backoffMs <= maxMs); + if (backoffMs != firstMs && backoffMs != prevMs) + randomValues++; + prevMs = backoffMs; + } + assertTrue("Delays not random", randomValues >= 2); // Allow for at most one edge case. + } + + static <T> void testReuseInParallel(int threads, int iterations, + Function<Backoff, Function<Flux<T>, Publisher<Long>>> retryOrRepeat, + Consumer<Function<Flux<T>, Publisher<Long>>> testTask) throws Exception { + int repeatCount = iterations - 1; + AtomicInteger nextBackoff = new AtomicInteger(); + // Keep track of the number of backoff invocations per instance + ConcurrentHashMap<Long, Integer> backoffCounts = new ConcurrentHashMap<>(); + // Use a countdown latch to get all instances to stop in the first backoff callback + CountDownLatch latch = new CountDownLatch(threads); + Backoff customBackoff = context -> { + Duration backoff = context.backoff(); + if (latch.getCount() > 0) { + assertNull("Wrong context, backoff must be null", backoff); + backoff = Duration.ofMillis(nextBackoff.incrementAndGet()); + backoffCounts.put(backoff.toMillis(), 1); + latch.countDown(); + try { + latch.await(10, TimeUnit.SECONDS); + } + catch (Exception e) { + // ignore, errors are handled later + } + } else { + assertNotNull("Wrong context, backoff must not be null", backoff); + long index = backoff.toMillis(); + backoffCounts.put(index, backoffCounts.get(index) + 1); + } + return new BackoffDelay(backoff); + }; + Function<Flux<T>, Publisher<Long>> retryFunc = retryOrRepeat.apply(customBackoff); + ExecutorService executor = Executors.newFixedThreadPool(threads); + List<Future<?>> futures = new ArrayList<>(); + try { + for (int i = 0; i < threads; i++) { + Runnable runnable = () -> testTask.accept(retryFunc); + futures.add(executor.submit(runnable)); + } + for (Future<?> future : futures) + future.get(5, TimeUnit.SECONDS); + } + finally { + executor.shutdownNow(); + } + + assertEquals(0, latch.getCount()); + assertEquals(threads, backoffCounts.size()); + for (Integer count : backoffCounts.values()) { + //backoff not invoked anymore when maxIteration reached + assertEquals(repeatCount, count.intValue()); + } + } +} diff --git a/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java b/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java new file mode 100644 index 0000000..2f2aad5 --- /dev/null +++ b/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java @@ -0,0 +1,391 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package reactor.retry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.net.SocketException; +import java.time.Duration; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.function.Consumer; + +import org.junit.Test; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +public class RetryWithAsyncCallbackTest { + + private Queue<RetryContext<?>> retries = new ConcurrentLinkedQueue<>(); + + @Test + public void shouldTimeoutRetryWithVirtualTime() { + // given + final int minBackoff = 1; + final int maxBackoff = 5; + final int timeout = 10; + + // then + StepVerifier.withVirtualTime(() -> + Mono.<String>error(new RuntimeException("Something went wrong")) + .retryWhen(RetryWithAsyncCallback.anyOf(Exception.class) + .exponentialBackoffWithJitter(Duration.ofSeconds(minBackoff), Duration.ofSeconds(maxBackoff)) + .timeout(Duration.ofSeconds(timeout))) + .subscribeOn(Schedulers.elastic())) + .expectSubscription() +// .expectNoEvent(Duration.ofSeconds(timeout)) + .thenAwait(Duration.ofSeconds(timeout)) + .expectError(RetryExhaustedException.class) + .verify(Duration.ofSeconds(timeout)); + } + + @Test + public void fluxRetryNoBackoff() { + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException())) + .retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry())); + + StepVerifier.create(flux) + .expectNext(0, 1, 0, 1, 0, 1) + .verifyError(RetryExhaustedException.class); + assertRetries(IOException.class, IOException.class); + RetryTestUtils.assertDelays(retries, 0L, 0L); + } + + @Test + public void monoRetryNoBackoff() { + Mono<?> mono = Mono.error(new IOException()) + .retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry())); + + StepVerifier.create(mono) + .verifyError(RetryExhaustedException.class); + assertRetries(IOException.class, IOException.class); + RetryTestUtils.assertDelays(retries, 0L, 0L); + } + + @Test + public void fluxRetryFixedBackoff() { + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException())) + .retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry())); + + StepVerifier.withVirtualTime(() -> flux) + .expectNext(0, 1) + .expectNoEvent(Duration.ofMillis(300)) + .thenAwait(Duration.ofMillis(300)) + .expectNext(0, 1) + .verifyError(RetryExhaustedException.class); + assertRetries(IOException.class); + RetryTestUtils.assertDelays(retries, 500L); + } + + @Test + public void monoRetryFixedBackoff() { + Mono<?> mono = Mono.error(new IOException()) + .retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry())); + + StepVerifier.withVirtualTime(() -> mono) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(300)) + .thenAwait(Duration.ofMillis(300)) + .verifyError(RetryExhaustedException.class); + + assertRetries(IOException.class); + RetryTestUtils.assertDelays(retries, 500L); + } + + + @Test + public void fluxRetryExponentialBackoff() { + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException())) + .retryWhen(RetryWithAsyncCallback.any() + .exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(500)) + .timeout(Duration.ofMillis(1500)) + .doOnRetry(onRetry())); + + StepVerifier.create(flux) + .expectNext(0, 1) + .expectNoEvent(Duration.ofMillis(50)) // delay=100 + .expectNext(0, 1) + .expectNoEvent(Duration.ofMillis(150)) // delay=200 + .expectNext(0, 1) + .expectNoEvent(Duration.ofMillis(250)) // delay=400 + .expectNext(0, 1) + .expectNoEvent(Duration.ofMillis(450)) // delay=500 + .expectNext(0, 1) + .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class)); + + assertRetries(IOException.class, IOException.class, IOException.class, IOException.class); + RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L); + } + @Test + public void monoRetryExponentialBackoff() { + Mono<?> mono = Mono.error(new IOException()) + .retryWhen(RetryWithAsyncCallback.any() + .exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(500)) + .retryMax(4) + .doOnRetry(onRetry())); + + StepVerifier.withVirtualTime(() -> mono) + .expectSubscription() + .thenAwait(Duration.ofMillis(100)) + .thenAwait(Duration.ofMillis(200)) + .thenAwait(Duration.ofMillis(400)) + .thenAwait(Duration.ofMillis(500)) + .verifyError(RetryExhaustedException.class); + + assertRetries(IOException.class, IOException.class, IOException.class, IOException.class); + RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L); + } + + @Test + public void fluxRetryRandomBackoff() { + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException())) + .retryWhen(RetryWithAsyncCallback.any() + .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000)) + .retryMax(4) + .doOnRetry(onRetry())); + + StepVerifier.create(flux) + .expectNext(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) + .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class)); + + assertRetries(IOException.class, IOException.class, IOException.class, IOException.class); + RetryTestUtils.assertRandomDelays(retries, 100, 2000); + } + + @Test + public void monoRetryRandomBackoff() { + Mono<?> mono = Mono.error(new IOException()) + .retryWhen(RetryWithAsyncCallback.any() + .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000)) + .retryMax(4) + .doOnRetry(onRetry())); + + StepVerifier.withVirtualTime(() -> mono) + .expectSubscription() + .thenAwait(Duration.ofMillis(100)) + .thenAwait(Duration.ofMillis(2000)) + .thenAwait(Duration.ofMillis(2000)) + .thenAwait(Duration.ofMillis(2000)) + .verifyError(RetryExhaustedException.class); + + assertRetries(IOException.class, IOException.class, IOException.class, IOException.class); + RetryTestUtils.assertRandomDelays(retries, 100, 2000); + } + + + @Test + public void fluxRetriableExceptions() { + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())) + .retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry())); + + StepVerifier.create(flux) + .expectNext(0, 1, 0, 1) + .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)); + + Flux<Integer> nonRetriable = Flux.concat(Flux.range(0, 2), Flux.error(new RuntimeException())) + .retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry())); + StepVerifier.create(nonRetriable) + .expectNext(0, 1) + .verifyError(RuntimeException.class); + + } + + @Test + public void fluxNonRetriableExceptions() { + + Retry<?> retry = RetryWithAsyncCallback.allBut(RuntimeException.class).retryOnce().doOnRetry(onRetry()); + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IllegalStateException())).retryWhen(retry); + + StepVerifier.create(flux) + .expectNext(0, 1) + .verifyError(IllegalStateException.class); + + + Flux<Integer> retriable = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())).retryWhen(retry); + StepVerifier.create(retriable) + .expectNext(0, 1, 0, 1) + .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)); + } + + @Test + public void fluxRetryAnyException() { + Retry<?> retry = RetryWithAsyncCallback.any().retryOnce().doOnRetry(onRetry()); + + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())).retryWhen(retry); + StepVerifier.create(flux) + .expectNext(0, 1, 0, 1) + .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)); + + Flux<Integer> flux2 = Flux.concat(Flux.range(0, 2), Flux.error(new RuntimeException())).retryWhen(retry); + StepVerifier.create(flux2) + .expectNext(0, 1, 0, 1) + .verifyErrorMatches(e -> isRetryExhausted(e, RuntimeException.class)); + + } + + @Test + public void fluxRetryOnPredicate() { + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())) + .retryWhen(RetryWithAsyncCallback.onlyIf(context -> context.iteration() < 3).doOnRetry(onRetry())); + + StepVerifier.create(flux) + .expectNext(0, 1, 0, 1, 0, 1) + .verifyError(SocketException.class); + } + + + @Test + public void doOnRetry() { + Semaphore semaphore = new Semaphore(0); + Retry<?> retry = RetryWithAsyncCallback.any() + .retryOnce() + .fixedBackoff(Duration.ofMillis(500)) + .doOnRetry(context -> semaphore.release()); + + StepVerifier.withVirtualTime(() -> Flux.range(0, 2).concatWith(Mono.error(new SocketException())).retryWhen(retry)) + .expectNext(0, 1) + .then(semaphore::acquireUninterruptibly) + .expectNoEvent(Duration.ofMillis(400)) + .thenAwait(Duration.ofMillis(200)) + .expectNext(0, 1) + .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)); + + StepVerifier.withVirtualTime(() -> Mono.error(new SocketException()).retryWhen(retry.noBackoff())) + .then(semaphore::acquireUninterruptibly) + .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)); + } + + @Test + public void onRetryWithMono() { + Semaphore semaphore = new Semaphore(0); + Retry<?> retry = RetryWithAsyncCallback.any() + .retryOnce() + .fixedBackoff(Duration.ofMillis(500)) + .onRetryWithMono(context -> Mono.fromCallable(() -> { semaphore.release(); return 0; })); + + StepVerifier.withVirtualTime(() -> Flux.range(0, 2).concatWith(Mono.error(new SocketException())).retryWhen(retry)) + .expectNext(0, 1) + .then(semaphore::acquireUninterruptibly) + .expectNoEvent(Duration.ofMillis(400)) + .thenAwait(Duration.ofMillis(200)) + .expectNext(0, 1) + .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)); + + StepVerifier.withVirtualTime(() -> Mono.error(new SocketException()).retryWhen(retry.noBackoff())) + .then(semaphore::acquireUninterruptibly) + .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)); + } + + @Test + public void retryApplicationContext() { + class AppContext { + boolean needsRollback; + void rollback() { + needsRollback = false; + } + void run() { + assertFalse("Rollback not performed", needsRollback); + needsRollback = true; + } + } + AppContext appContext = new AppContext(); + Retry<?> retry = RetryWithAsyncCallback.<AppContext>any().withApplicationContext(appContext) + .retryMax(2) + .doOnRetry(context -> { + AppContext ac = context.applicationContext(); + assertNotNull("Application context not propagated", ac); + ac.rollback(); + }); + + StepVerifier.withVirtualTime(() -> Mono.error(new RuntimeException()).doOnNext(i -> appContext.run()).retryWhen(retry)) + .verifyErrorMatches(e -> isRetryExhausted(e, RuntimeException.class)); + + } + + @Test + public void fluxRetryCompose() { + Retry<?> retry = RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry()); + Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException())).as(retry::apply); + + StepVerifier.create(flux) + .expectNext(0, 1, 0, 1, 0, 1) + .verifyError(RetryExhaustedException.class); + assertRetries(IOException.class, IOException.class); + } + + @Test + public void monoRetryCompose() { + Retry<?> retry = RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry()); + Flux<?> flux = Mono.error(new IOException()).as(retry::apply); + + StepVerifier.create(flux) + .verifyError(RetryExhaustedException.class); + assertRetries(IOException.class, IOException.class); + } + + @Test + public void functionReuseInParallel() throws Exception { + int retryCount = 19; + int range = 100; + Integer[] values = new Integer[(retryCount + 1) * range]; + for (int i = 0; i <= retryCount; i++) { + for (int j = 1; j <= range; j++) + values[i * range + j - 1] = j; + } + RetryTestUtils.testReuseInParallel(2, 20, + backoff -> RetryWithAsyncCallback.<Integer>any().retryMax(19).backoff(backoff), + retryFunc -> StepVerifier.create(Flux.range(1, range).concatWith(Mono.error(new SocketException())).retryWhen(retryFunc)) + .expectNext(values) + .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class))); + } + + Consumer<? super RetryContext<?>> onRetry() { + return context -> retries.add(context); + } + + @SafeVarargs + private final void assertRetries(Class<? extends Throwable>... exceptions) { + assertEquals(exceptions.length, retries.size()); + int index = 0; + for (Iterator<RetryContext<?>> it = retries.iterator(); it.hasNext(); ) { + RetryContext<?> retryContext = it.next(); + assertEquals(index + 1, retryContext.iteration()); + assertEquals(exceptions[index], retryContext.exception().getClass()); + index++; + } + } + + static boolean isRetryExhausted(Throwable e, Class<? extends Throwable> cause) { + return e instanceof RetryExhaustedException && cause.isInstance(e.getCause()); + } + + @Test + public void retryToString() { + System.out.println(RetryWithAsyncCallback.any().noBackoff().retryMax(2).toString()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
