This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 875fb0ec655f9d6cd83ec10e067a3bbb28eda649
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon May 4 16:12:21 2020 +0200

    CAMEL-15008: ReactiveExecutor should run scheduled tasks more fairly
---
 .../java/org/apache/camel/spi/ReactiveExecutor.java  |  7 +++++++
 .../camel/impl/engine/DefaultReactiveExecutor.java   |  5 +++++
 .../processor/aggregate/AggregateProcessor.java      | 20 +++++++++++---------
 .../SplitAggregateStackOverflowIssueTest.java        |  6 +++---
 4 files changed, 26 insertions(+), 12 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 3de9d65..3f30d59 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -34,6 +34,13 @@ public interface ReactiveExecutor {
     void schedule(Runnable runnable);
 
     /**
+     * Schedules the task to be run first
+     *
+     * @param runnable    the task
+     */
+    void scheduleFirst(Runnable runnable);
+
+    /**
      * Schedules the task to be prioritized and run asap
      *
      * @param runnable    the task
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 406e21f..ccf7094 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -52,6 +52,11 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
 
     @Override
     public void schedule(Runnable runnable) {
+        workers.get().schedule(runnable, false, false, false);
+    }
+
+    @Override
+    public void scheduleFirst(Runnable runnable) {
         workers.get().schedule(runnable, true, false, false);
     }
 
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 638b9f5..bdbff35 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -861,15 +861,17 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
 
         // send this exchange
         // the call to schedule last if needed to ensure in-order processing 
of the aggregates
-        executorService.submit(() -> reactiveExecutor.scheduleSync(() -> 
processor.process(exchange, done -> {
-            // log exception if there was a problem
-            if (exchange.getException() != null) {
-                // if there was an exception then let the exception handler 
handle it
-                getExceptionHandler().handleException("Error processing 
aggregated exchange", exchange, exchange.getException());
-            } else {
-                LOG.trace("Processing aggregated exchange: {} complete.", 
exchange);
-            }
-        })));
+        executorService.execute(() -> {
+            processor.process(exchange, done -> {
+                // log exception if there was a problem
+                if (exchange.getException() != null) {
+                    // if there was an exception then let the exception 
handler handle it
+                    getExceptionHandler().handleException("Error processing 
aggregated exchange", exchange, exchange.getException());
+                } else {
+                    LOG.trace("Processing aggregated exchange: {} complete.", 
exchange);
+                }
+            });
+        });
     }
 
     /**
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java
index ad8098e..d5fcaf7 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java
@@ -14,7 +14,6 @@ import org.junit.Test;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.camel.Exchange.SPLIT_COMPLETE;
 
-@Ignore("TODO: Fix this bug")
 public class SplitAggregateStackOverflowIssueTest extends ContextTestSupport {
 
     private final AtomicInteger count = new AtomicInteger();
@@ -36,7 +35,7 @@ public class SplitAggregateStackOverflowIssueTest extends 
ContextTestSupport {
 
         MockEndpoint.assertIsSatisfied(60, SECONDS, result);
 
-        assertTrue("Stackframe must not be too high, was " + count.get(), 
count.get() < 50);
+        assertTrue("Stackframe must not be too high, was " + count.get(), 
count.get() < 100);
     }
 
     @Override
@@ -47,8 +46,9 @@ public class SplitAggregateStackOverflowIssueTest extends 
ContextTestSupport {
 
                 from("direct:start")
                     .split().tokenize("\n").streaming()
+                        .to("log:input?groupSize=100")
                         .process(e -> {
-                            if (e.getProperty(Exchange.SPLIT_INDEX, 0, 
int.class) % 100 == 0) {
+                            if (e.getProperty(Exchange.SPLIT_INDEX, 0, 
int.class) % 1000 == 0) {
                                 int frames = 
Thread.currentThread().getStackTrace().length;
                                 count.set(frames);
                                 log.info("Stackframe: {}", frames);

Reply via email to