This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 875fb0ec655f9d6cd83ec10e067a3bbb28eda649 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon May 4 16:12:21 2020 +0200 CAMEL-15008: ReactiveExecutor should run scheduled tasks more fairly --- .../java/org/apache/camel/spi/ReactiveExecutor.java | 7 +++++++ .../camel/impl/engine/DefaultReactiveExecutor.java | 5 +++++ .../processor/aggregate/AggregateProcessor.java | 20 +++++++++++--------- .../SplitAggregateStackOverflowIssueTest.java | 6 +++--- 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java index 3de9d65..3f30d59 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java @@ -34,6 +34,13 @@ public interface ReactiveExecutor { void schedule(Runnable runnable); /** + * Schedules the task to be run first + * + * @param runnable the task + */ + void scheduleFirst(Runnable runnable); + + /** * Schedules the task to be prioritized and run asap * * @param runnable the task diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index 406e21f..ccf7094 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -52,6 +52,11 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE @Override public void schedule(Runnable runnable) { + workers.get().schedule(runnable, false, false, false); + } + + @Override + public void scheduleFirst(Runnable runnable) { workers.get().schedule(runnable, true, false, false); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 638b9f5..bdbff35 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -861,15 +861,17 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat // send this exchange // the call to schedule last if needed to ensure in-order processing of the aggregates - executorService.submit(() -> reactiveExecutor.scheduleSync(() -> processor.process(exchange, done -> { - // log exception if there was a problem - if (exchange.getException() != null) { - // if there was an exception then let the exception handler handle it - getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); - } else { - LOG.trace("Processing aggregated exchange: {} complete.", exchange); - } - }))); + executorService.execute(() -> { + processor.process(exchange, done -> { + // log exception if there was a problem + if (exchange.getException() != null) { + // if there was an exception then let the exception handler handle it + getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); + } else { + LOG.trace("Processing aggregated exchange: {} complete.", exchange); + } + }); + }); } /** diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java index ad8098e..d5fcaf7 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java @@ -14,7 +14,6 @@ import org.junit.Test; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.camel.Exchange.SPLIT_COMPLETE; -@Ignore("TODO: Fix this bug") public class SplitAggregateStackOverflowIssueTest extends ContextTestSupport { private final AtomicInteger count = new AtomicInteger(); @@ -36,7 +35,7 @@ public class SplitAggregateStackOverflowIssueTest extends ContextTestSupport { MockEndpoint.assertIsSatisfied(60, SECONDS, result); - assertTrue("Stackframe must not be too high, was " + count.get(), count.get() < 50); + assertTrue("Stackframe must not be too high, was " + count.get(), count.get() < 100); } @Override @@ -47,8 +46,9 @@ public class SplitAggregateStackOverflowIssueTest extends ContextTestSupport { from("direct:start") .split().tokenize("\n").streaming() + .to("log:input?groupSize=100") .process(e -> { - if (e.getProperty(Exchange.SPLIT_INDEX, 0, int.class) % 100 == 0) { + if (e.getProperty(Exchange.SPLIT_INDEX, 0, int.class) % 1000 == 0) { int frames = Thread.currentThread().getStackTrace().length; count.set(frames); log.info("Stackframe: {}", frames);