This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6712e3d87e83a8a4c84c838bb611f519f8dea137 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon May 4 16:48:53 2020 +0200 CAMEL-15008: ReactiveExecutor should run scheduled tasks more fairly --- .../processor/aggregate/AggregateProcessor.java | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index bdbff35..aada58c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -860,18 +860,16 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat exchange.adapt(ExtendedExchange.class).addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); // send this exchange - // the call to schedule last if needed to ensure in-order processing of the aggregates - executorService.execute(() -> { - processor.process(exchange, done -> { - // log exception if there was a problem - if (exchange.getException() != null) { - // if there was an exception then let the exception handler handle it - getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); - } else { - LOG.trace("Processing aggregated exchange: {} complete.", exchange); - } - }); - }); + // the call to schedule is needed to ensure in-order processing of the aggregates + executorService.execute(() -> reactiveExecutor.schedule(() -> processor.process(exchange, done -> { + // log exception if there was a problem + if (exchange.getException() != null) { + // if there was an exception then let the exception handler handle it + getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); + } else { + LOG.trace("Processing aggregated exchange: {} complete.", exchange); + } + }))); } /**