Repository: camel Updated Branches: refs/heads/master a030a2231 -> 637e2b6ce
Improve SupervisingRouteController Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/637e2b6c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/637e2b6c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/637e2b6c Branch: refs/heads/master Commit: 637e2b6ce2303d0bca91f6789ab39835da795d5a Parents: a030a22 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed Aug 9 16:20:56 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed Aug 9 16:21:05 2017 +0200 ---------------------------------------------------------------------- .../camel/impl/SupervisingRouteController.java | 81 +++++++------ .../camel/util/backoff/BackOffContext.java | 56 ++++++++- .../apache/camel/util/backoff/BackOffTimer.java | 119 ++++++++++++++----- .../camel/util/backoff/BackOffTimerTest.java | 76 ++++++++++-- .../SupervisingRouteControllerRestartTest.java | 6 + 5 files changed, 252 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/637e2b6c/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java index 3054ac4..a9535e3 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java +++ b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -52,7 +51,6 @@ import org.apache.camel.spi.RouteController; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.support.EventNotifierSupport; -import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.backoff.BackOff; import org.apache.camel.util.backoff.BackOffContext; @@ -192,6 +190,10 @@ public class SupervisingRouteController extends DefaultRouteController { return Collections.unmodifiableList(filters); } + public Optional<BackOffContext> getBackOffContext(String id) { + return routeManager.getBackOffContext(id); + } + // ********************************* // Lifecycle // ********************************* @@ -337,41 +339,34 @@ public class SupervisingRouteController extends DefaultRouteController { private void doStopRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder, Exception> consumer) throws Exception { synchronized (lock) { if (checker) { - // remove them from checked routes so they don't get started by the - // routes check task as a manual operation on the routes indicates that - // the route is then managed manually + // remove it from checked routes so the route don't get started + // by the routes manager task as a manual operation on the routes + // indicates that the route is then managed manually routeManager.release(route); } - ServiceStatus status = route.getStatus(); - if (!status.isStoppable()) { - LOGGER.debug("Route {} status is {}, skipping", route.getId(), status); - return; - } - - consumer.accept(route); + LOGGER.info("Route {} has been requested to stop: stop supervising it", route.getId()); // Mark the route as un-managed route.getContext().setRouteController(null); + + consumer.accept(route); } } private void doStartRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder, Exception> consumer) throws Exception { synchronized (lock) { - ServiceStatus status = route.getStatus(); - if (!status.isStartable()) { - LOGGER.debug("Route {} status is {}, skipping", route.getId(), status); - return; - } + // If a manual start is triggered, then the controller should take + // care that the route is started + route.getContext().setRouteController(this); try { if (checker) { + // remove it from checked routes as a manual start may trigger + // a new back off task if start fails routeManager.release(route); } - // Mark the route as managed - route.getContext().setRouteController(this); - consumer.accept(route); } catch (Exception e) { @@ -444,7 +439,7 @@ public class SupervisingRouteController extends DefaultRouteController { private class RouteManager { private final Logger logger; - private final ConcurrentMap<RouteHolder, CompletableFuture<BackOffContext>> routes; + private final ConcurrentMap<RouteHolder, BackOffTimer.Task> routes; RouteManager() { this.logger = LoggerFactory.getLogger(RouteManager.class); @@ -461,9 +456,7 @@ public class SupervisingRouteController extends DefaultRouteController { logger.info("Start supervising route: {} with back-off: {}", r.getId(), backOff); - // Return this future as cancel does not have effect on the - // computation (future chain) - CompletableFuture<BackOffContext> future = timer.schedule(backOff, context -> { + BackOffTimer.Task task = timer.schedule(backOff, context -> { try { logger.info("Try to restart route: {}", r.getId()); @@ -474,22 +467,19 @@ public class SupervisingRouteController extends DefaultRouteController { } }); - future.whenComplete((context, throwable) -> { - if (context == null || context.isExhausted()) { - // This indicates that the future has been cancelled + task.whenComplete((context, throwable) -> { + if (context == null || context.getStatus() != BackOffContext.Status.Active) { + // This indicates that the task has been cancelled // or that back-off retry is exhausted thus if the - // route is not started it is moved out of the supervisor. - - if (context != null && context.isExhausted()) { - LOGGER.info("Back-off for route {} is exhausted, no more attempts will be made", route.getId()); - } + // route is not started it is moved out of the + // supervisor control. synchronized (lock) { final ServiceStatus status = route.getStatus(); + final boolean stopped = status.isStopped() || status.isStopping(); - if (status.isStopped() || status.isStopping()) { - LOGGER.info("Route {} has status {}, stop supervising it", route.getId(), status); - + if (context != null && context.getStatus() == BackOffContext.Status.Exhausted && stopped) { + LOGGER.info("Back-off for route {} is exhausted, no more attempts will be made and stop supervising it", route.getId()); r.getContext().setRouteController(null); } } @@ -498,24 +488,33 @@ public class SupervisingRouteController extends DefaultRouteController { routes.remove(r); }); - return future; + return task; } ); } boolean release(RouteHolder route) { - CompletableFuture<BackOffContext> future = routes.remove(route); - if (future != null) { - future.cancel(true); + BackOffTimer.Task task = routes.remove(route); + if (task != null) { + LOGGER.info("Cancel restart task for route {}", route.getId()); + task.cancel(); } - return future != null; + return task != null; } void clear() { - routes.forEach((k, v) -> v.cancel(true)); + routes.values().forEach(BackOffTimer.Task::cancel); routes.clear(); } + + public Optional<BackOffContext> getBackOffContext(String id) { + return routes.entrySet().stream() + .filter(e -> ObjectHelper.equal(e.getKey().getId(), id)) + .findFirst() + .map(Map.Entry::getValue) + .map(BackOffTimer.Task::getContext); + } } // ********************************* http://git-wip-us.apache.org/repos/asf/camel/blob/637e2b6c/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java index 617f0a8..2358d13 100644 --- a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java +++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java @@ -20,18 +20,30 @@ package org.apache.camel.util.backoff; * The context associated to a back-off operation. */ public final class BackOffContext { + public enum Status { + Active, + Inactive, + Exhausted + } + private final BackOff backOff; + private Status status; private long currentAttempts; private long currentDelay; private long currentElapsedTime; + private long lastAttemptTime; + private long nextAttemptTime; public BackOffContext(BackOff backOff) { this.backOff = backOff; + this.status = Status.Active; this.currentAttempts = 0; this.currentDelay = backOff.getDelay().toMillis(); this.currentElapsedTime = 0; + this.lastAttemptTime = BackOff.NEVER; + this.nextAttemptTime = BackOff.NEVER; } // ************************************* @@ -46,6 +58,13 @@ public final class BackOffContext { } /** + * Gets the context status. + */ + public Status getStatus() { + return status; + } + + /** * The number of attempts so far. */ public long getCurrentAttempts() { @@ -67,10 +86,31 @@ public final class BackOffContext { } /** - * Inform if the context is exhausted thus not more attempts should be made. + * The time the last attempt has been performed. + */ + public long getLastAttemptTime() { + return lastAttemptTime; + } + + /** + * Used by BackOffTimer + */ + void setLastAttemptTime(long lastAttemptTime) { + this.lastAttemptTime = lastAttemptTime; + } + + /** + * An indication about the time the next attempt will be made. + */ + public long getNextAttemptTime() { + return nextAttemptTime; + } + + /** + * Used by BackOffTimer */ - public boolean isExhausted() { - return currentDelay == BackOff.NEVER; + void setNextAttemptTime(long nextAttemptTime) { + this.nextAttemptTime = nextAttemptTime; } // ************************************* @@ -86,14 +126,16 @@ public final class BackOffContext { // A call to next when currentDelay is set to NEVER has no effects // as this means that either the timer is exhausted or it has explicit // stopped - if (currentDelay != BackOff.NEVER) { + if (status == Status.Active) { currentAttempts++; if (currentAttempts > backOff.getMaxAttempts()) { currentDelay = BackOff.NEVER; + status = Status.Exhausted; } else if (currentElapsedTime > backOff.getMaxElapsedTime().toMillis()) { currentDelay = BackOff.NEVER; + status = Status.Exhausted; } else { if (currentDelay <= backOff.getMaxDelay().toMillis()) { currentDelay = (long) (currentDelay * backOff.getMultiplier()); @@ -113,6 +155,9 @@ public final class BackOffContext { this.currentAttempts = 0; this.currentDelay = 0; this.currentElapsedTime = 0; + this.lastAttemptTime = BackOff.NEVER; + this.nextAttemptTime = BackOff.NEVER; + this.status = Status.Active; return this; } @@ -125,6 +170,9 @@ public final class BackOffContext { this.currentAttempts = 0; this.currentDelay = BackOff.NEVER; this.currentElapsedTime = 0; + this.lastAttemptTime = BackOff.NEVER; + this.nextAttemptTime = BackOff.NEVER; + this.status = Status.Inactive; return this; } http://git-wip-us.apache.org/repos/asf/camel/blob/637e2b6c/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java index dbae257..b93bbb1 100644 --- a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java +++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java @@ -16,17 +16,25 @@ */ package org.apache.camel.util.backoff; -import java.util.concurrent.CompletableFuture; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.apache.camel.util.function.ThrowingFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A simple timer utility that use a linked {@link BackOff} to determine when * a task should be executed. */ public class BackOffTimer { + private static final Logger LOGGER = LoggerFactory.getLogger(BackOffTimer.class); + private final ScheduledExecutorService scheduler; public BackOffTimer(ScheduledExecutorService scheduler) { @@ -37,8 +45,8 @@ public class BackOffTimer { * Schedule the given function/task to be executed some time in the future * according to the given backOff. */ - public CompletableFuture<BackOffContext> schedule(BackOff backOff, ThrowingFunction<BackOffContext, Boolean, Exception> function) { - final Task task = new Task(backOff, function); + public Task schedule(BackOff backOff, ThrowingFunction<BackOffContext, Boolean, Exception> function) { + final TaskImpl task = new TaskImpl(backOff, function); long delay = task.getContext().next(); if (delay != BackOff.NEVER) { @@ -54,54 +62,107 @@ public class BackOffTimer { // TimerTask // **************************************** - private final class Task extends CompletableFuture<BackOffContext> implements Runnable { + public interface Task { + /** + * Gets the {@link BackOffContext} associated with this task. + */ + BackOffContext getContext(); + + /** + * Cancel the task. + */ + void cancel(); + + /** + * Action to execute when the context is completed (cancelled or exhausted) + * + * @param whenCompleted the consumer. + */ + void whenComplete(BiConsumer<BackOffContext, Throwable> whenCompleted); + } + + // **************************************** + // TimerTask + // **************************************** + + private final class TaskImpl implements Task, Runnable { private final BackOffContext context; private final ThrowingFunction<BackOffContext, Boolean, Exception> function; + private final AtomicReference<ScheduledFuture<?>> futureRef; + private final List<BiConsumer<BackOffContext, Throwable>> consumers; - Task(BackOff backOff, ThrowingFunction<BackOffContext, Boolean, Exception> function) { + TaskImpl(BackOff backOff, ThrowingFunction<BackOffContext, Boolean, Exception> function) { this.context = new BackOffContext(backOff); this.function = function; + this.consumers = new ArrayList<>(); + this.futureRef = new AtomicReference<>(); } @Override public void run() { - if (context.isExhausted() || isDone() || isCancelled()) { - if (!isDone()) { - complete(); - } - - return; - } - - try { - if (function.apply(context)) { - long delay = context.next(); - if (context.isExhausted()) { + if (context.getStatus() == BackOffContext.Status.Active) { + try { + final long currentTime = System.currentTimeMillis(); + + context.setLastAttemptTime(currentTime); + + if (function.apply(context)) { + long delay = context.next(); + if (context.getStatus() != BackOffContext.Status.Active) { + // if the call to next makes the context not more + // active, signal task completion. + complete(); + } else { + context.setNextAttemptTime(currentTime + delay); + + // Cache the scheduled future so it can be cancelled + // later by Task.cancel() + futureRef.lazySet(scheduler.schedule(this, delay, TimeUnit.MILLISECONDS)); + } + } else { + // if the function return false no more attempts should + // be made so stop the context. + context.stop(); + + // and signal the task as completed. complete(); - } else if (!context.isExhausted() && !isDone() && !isCancelled()) { - scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); } - } else { - complete(); + } catch (Exception e) { + context.stop(); + consumers.forEach(c -> c.accept(context, e)); } - } catch (Exception e) { - completeExceptionally(e); } } @Override - public boolean cancel(boolean mayInterruptIfRunning) { + public BackOffContext getContext() { + return context; + } + + @Override + public void cancel() { context.stop(); - return super.cancel(mayInterruptIfRunning); + ScheduledFuture<?> future = futureRef.get(); + if (future != null) { + future.cancel(true); + } + + // signal task completion on cancel. + complete(); } - boolean complete() { - return super.complete(context); + @Override + public void whenComplete(BiConsumer<BackOffContext, Throwable> whenCompleted) { + synchronized (this.consumers) { + consumers.add(whenCompleted); + } } - BackOffContext getContext() { - return context; + void complete() { + synchronized (this.consumers) { + consumers.forEach(c -> c.accept(context, null)); + } } } } http://git-wip-us.apache.org/repos/asf/camel/blob/637e2b6c/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java b/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java index 5c2cc9f..7d9f0e7 100644 --- a/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java +++ b/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java @@ -16,9 +16,11 @@ */ package org.apache.camel.util.backoff; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; @@ -27,12 +29,13 @@ import org.junit.Test; public class BackOffTimerTest { @Test public void testBackOffTimer() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(0); final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); final BackOff backOff = BackOff.builder().delay(100).build(); final BackOffTimer timer = new BackOffTimer(executor); - timer.schedule( + BackOffTimer.Task task = timer.schedule( backOff, context -> { Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); @@ -42,23 +45,28 @@ public class BackOffTimerTest { return counter.get() < 5; } - ).thenAccept( - context -> { + ); + + task.whenComplete( + (context, throwable) -> { Assert.assertEquals(5, counter.get()); + latch.countDown(); } - ).get(5, TimeUnit.SECONDS); + ); + latch.await(5, TimeUnit.SECONDS); executor.shutdownNow(); } @Test public void testBackOffTimerWithMaxAttempts() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(0); final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); final BackOff backOff = BackOff.builder().delay(100).maxAttempts(5L).build(); final BackOffTimer timer = new BackOffTimer(executor); - timer.schedule( + BackOffTimer.Task task = timer.schedule( backOff, context -> { Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); @@ -68,23 +76,29 @@ public class BackOffTimerTest { return true; } - ).thenAccept( - context -> { + ); + + task.whenComplete( + (context, throwable) -> { Assert.assertEquals(5, counter.get()); + Assert.assertEquals(BackOffContext.Status.Exhausted, context.getStatus()); + latch.countDown(); } - ).get(5, TimeUnit.SECONDS); + ); + latch.await(5, TimeUnit.SECONDS); executor.shutdownNow(); } @Test public void testBackOffTimerWithMaxElapsedTime() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(0); final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); final BackOff backOff = BackOff.builder().delay(100).maxElapsedTime(400).build(); final BackOffTimer timer = new BackOffTimer(executor); - timer.schedule( + BackOffTimer.Task task = timer.schedule( backOff, context -> { Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); @@ -94,11 +108,49 @@ public class BackOffTimerTest { return true; } - ).thenAccept( - context -> { + ); + + task.whenComplete( + (context, throwable) -> { Assert.assertTrue(counter.get() <= 5); + Assert.assertEquals(BackOffContext.Status.Exhausted, context.getStatus()); + latch.countDown(); } - ).get(5, TimeUnit.SECONDS); + ); + + latch.await(5, TimeUnit.SECONDS); + executor.shutdownNow(); + } + + @Test + public void testBackOffTimerStop() throws Exception { + final CountDownLatch latch = new CountDownLatch(5); + final AtomicBoolean done = new AtomicBoolean(false); + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); + final BackOff backOff = BackOff.builder().delay(100).build(); + final BackOffTimer timer = new BackOffTimer(executor); + + BackOffTimer.Task task = timer.schedule( + backOff, + context -> { + Assert.assertEquals(BackOffContext.Status.Active, context.getStatus()); + + latch.countDown(); + + return false; + } + ); + + task.whenComplete( + (context, throwable) -> { + Assert.assertEquals(BackOffContext.Status.Inactive, context.getStatus()); + done.set(true); + } + ); + + latch.await(2, TimeUnit.SECONDS); + task.cancel(); + Assert.assertTrue(done.get()); executor.shutdownNow(); } http://git-wip-us.apache.org/repos/asf/camel/blob/637e2b6c/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java index 74c8f09..cbe7b2b 100644 --- a/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java +++ b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/SupervisingRouteControllerRestartTest.java @@ -24,6 +24,7 @@ import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.SupervisingRouteController; import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.util.backoff.BackOffContext; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -94,6 +95,10 @@ public class SupervisingRouteControllerRestartTest { // Wait for at lest one restart attempt. Thread.sleep(2000); + Assert.assertTrue(controller.getBackOffContext("jetty").isPresent()); + Assert.assertEquals(BackOffContext.Status.Active, controller.getBackOffContext("jetty").get().getStatus()); + Assert.assertTrue(controller.getBackOffContext("jetty").get().getCurrentAttempts() > 0); + try { socket.close(); } catch (Exception e) { @@ -105,6 +110,7 @@ public class SupervisingRouteControllerRestartTest { Assert.assertEquals(ServiceStatus.Started, context.getRouteStatus("jetty")); Assert.assertNotNull(context.getRoute("jetty").getRouteContext().getRouteController()); + Assert.assertFalse(controller.getBackOffContext("jetty").isPresent()); } // *************************************