This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6712e3d87e83a8a4c84c838bb611f519f8dea137
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon May 4 16:48:53 2020 +0200

    CAMEL-15008: ReactiveExecutor should run scheduled tasks more fairly
---
 .../processor/aggregate/AggregateProcessor.java    | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index bdbff35..aada58c 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -860,18 +860,16 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
         exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
AggregateOnCompletion(exchange.getExchangeId()));
 
         // send this exchange
-        // the call to schedule last if needed to ensure in-order processing 
of the aggregates
-        executorService.execute(() -> {
-            processor.process(exchange, done -> {
-                // log exception if there was a problem
-                if (exchange.getException() != null) {
-                    // if there was an exception then let the exception 
handler handle it
-                    getExceptionHandler().handleException("Error processing 
aggregated exchange", exchange, exchange.getException());
-                } else {
-                    LOG.trace("Processing aggregated exchange: {} complete.", 
exchange);
-                }
-            });
-        });
+        // the call to schedule is needed to ensure in-order processing of the 
aggregates
+        executorService.execute(() -> reactiveExecutor.schedule(() -> 
processor.process(exchange, done -> {
+            // log exception if there was a problem
+            if (exchange.getException() != null) {
+                // if there was an exception then let the exception handler 
handle it
+                getExceptionHandler().handleException("Error processing 
aggregated exchange", exchange, exchange.getException());
+            } else {
+                LOG.trace("Processing aggregated exchange: {} complete.", 
exchange);
+            }
+        })));
     }
 
     /**

Reply via email to