This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit adb3c4b1496ddba907b095a41603858a3a7412af Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon May 4 17:24:39 2020 +0200 CAMEL-14996: Okay after diving more into this then CAMEL-15008 fixed the root problem so we dont need to separate multicast eip into parallel vs sequential task to fix the endless stack frame. So reverting this code back and a little polish. --- .../apache/camel/processor/MulticastProcessor.java | 207 +++------------------ 1 file changed, 29 insertions(+), 178 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index d0b1e2c..7d542ae 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncCallback; @@ -177,9 +176,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) { this(camelContext, route, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, - shareUnitOfWork, parallelAggregate, false); + shareUnitOfWork, parallelAggregate, false); } - + public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, @@ -260,15 +259,14 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat return true; } + MulticastTask state = new MulticastTask(exchange, pairs, callback); if (isParallelProcessing()) { - MulticastParallelTask task = new MulticastParallelTask(exchange, pairs, callback); - executorService.submit(() -> reactiveExecutor.schedule(task)); + executorService.submit(() -> reactiveExecutor.schedule(state)); } else { - MulticastTask task = new MulticastTask(exchange, pairs, callback); if (exchange.isTransacted()) { - reactiveExecutor.scheduleSync(task); + reactiveExecutor.scheduleSync(state); } else { - reactiveExecutor.scheduleMain(task); + reactiveExecutor.scheduleMain(state); } } @@ -286,66 +284,6 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } } - private interface MulticastCompletionService { - - Exchange poll(); - - Exchange pollUnordered(); - - void submit(Consumer<Consumer<Exchange>> runner); - - } - - private class MulticastCompletionServiceParallelTask implements MulticastCompletionService { - private final AsyncCompletionService<Exchange> completion; - - public MulticastCompletionServiceParallelTask(ReentrantLock lock) { - this.completion = new AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), lock);; - } - - @Override - public Exchange poll() { - return completion.poll(); - } - - @Override - public Exchange pollUnordered() { - return completion.pollUnordered(); - } - - @Override - public void submit(Consumer<Consumer<Exchange>> runner) { - completion.submit(runner); - } - } - - private class MulticastCompletionServiceTask implements MulticastCompletionService { - - private final AtomicReference<Exchange> exchange = new AtomicReference<>(); - - public MulticastCompletionServiceTask() { - } - - @Override - public Exchange poll() { - return exchange.getAndSet(null); - } - - @Override - public Exchange pollUnordered() { - return exchange.getAndSet(null); - } - - @Override - public void submit(Consumer<Consumer<Exchange>> runner) { - runner.accept(this::setResult); - } - - private void setResult(Exchange result) { - this.exchange.set(result); - } - } - protected class MulticastTask implements Runnable { final Exchange original; @@ -353,7 +291,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat final AsyncCallback callback; final Iterator<ProcessorExchangePair> iterator; final ReentrantLock lock; - MulticastCompletionService completion; + final AsyncCompletionService<Exchange> completion; final AtomicReference<Exchange> result; final AtomicInteger nbExchangeSent = new AtomicInteger(); final AtomicInteger nbAggregated = new AtomicInteger(); @@ -366,8 +304,11 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat this.callback = callback; this.iterator = pairs.iterator(); this.lock = new ReentrantLock(); - this.completion = new MulticastCompletionServiceTask(); + this.completion = new AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), lock); this.result = new AtomicReference<>(); + if (timeout > 0) { + schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS); + } } @Override @@ -375,10 +316,6 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat return "MulticastTask"; } - private Exchange completionPoll() { - return completion.poll(); - } - @Override public void run() { try { @@ -407,7 +344,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat int index = nbExchangeSent.getAndIncrement(); updateNewExchange(exchange, index, pairs, hasNext); - if (!hasNext) { + // Schedule the processing of the next pair + if (hasNext) { + if (isParallelProcessing()) { + schedule(this); + } + } else { allSent.set(true); } @@ -442,7 +384,8 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat // aggregate exchanges if any aggregate(); - if (hasNext) { + // next step + if (hasNext && !isParallelProcessing()) { schedule(this); } }); @@ -458,7 +401,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat if (lock.tryLock()) { try { Exchange exchange; - while (!done.get() && (exchange = completionPoll()) != null) { + while (!done.get() && (exchange = completion.poll()) != null) { doAggregate(result, exchange, original); if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) { doDone(result.get(), true); @@ -474,101 +417,6 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } } - protected void doDone(Exchange exchange, boolean forceExhaust) { - if (done.compareAndSet(false, true)) { - MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust); - } - } - } - - protected class MulticastParallelTask extends MulticastTask { - - MulticastParallelTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { - super(original, pairs, callback); - this.completion = new MulticastCompletionServiceParallelTask(lock); - if (timeout > 0) { - schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS); - } - } - - @Override - public String toString() { - return "MulticastParallelTask"; - } - - @Override - public void run() { - try { - if (done.get()) { - return; - } - - // Check if the iterator is empty - // This can happen the very first time we check the existence - // of an item before queuing the run. - // or some iterators may return true for hasNext() but then null in next() - if (!iterator.hasNext()) { - doDone(result.get(), true); - return; - } - - ProcessorExchangePair pair = iterator.next(); - boolean hasNext = iterator.hasNext(); - // some iterators may return true for hasNext() but then null in next() - if (pair == null && !hasNext) { - doDone(result.get(), true); - return; - } - - Exchange exchange = pair.getExchange(); - int index = nbExchangeSent.getAndIncrement(); - updateNewExchange(exchange, index, pairs, hasNext); - - // Schedule the processing of the next pair - if (hasNext) { - schedule(this); - } else { - allSent.set(true); - } - - completion.submit(exchangeResult -> { - // compute time taken if sending to another endpoint - StopWatch watch = beforeSend(pair); - - AsyncProcessor async = AsyncProcessorConverterHelper.convert(pair.getProcessor()); - async.process(exchange, doneSync -> { - afterSend(pair, watch); - - // Decide whether to continue with the multicast or not; similar logic to the Pipeline - // remember to test for stop on exception and aggregate before copying back results - boolean continueProcessing = PipelineHelper.continueProcessing(exchange, "Multicast processing failed for number " + index, LOG); - if (stopOnException && !continueProcessing) { - if (exchange.getException() != null) { - // wrap in exception to explain where it failed - exchange.setException(new CamelExchangeException("Multicast processing failed for number " + index, exchange, exchange.getException())); - } else { - // we want to stop on exception, and the exception was handled by the error handler - // this is similar to what the pipeline does, so we should do the same to not surprise end users - // so we should set the failed exchange as the result and be done - result.set(exchange); - } - // and do the done work - doDone(exchange, true); - return; - } - - exchangeResult.accept(exchange); - - // aggregate exchanges if any - aggregate(); - }); - }); - } catch (Exception e) { - original.setException(e); - doDone(null, false); - } - } - protected void timeout() { Lock lock = this.lock; if (lock.tryLock()) { @@ -596,6 +444,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } } } + + protected void doDone(Exchange exchange, boolean forceExhaust) { + if (done.compareAndSet(false, true)) { + MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust); + } + } } protected void schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) { @@ -804,10 +658,10 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } // If the multi-cast processor has an aggregation strategy - // then the StreamCache created by the child routes must not be - // closed by the unit of work of the child route, but by the unit of + // then the StreamCache created by the child routes must not be + // closed by the unit of work of the child route, but by the unit of // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting). - // Set therefore the unit of work of the parent route as stream cache unit of work, + // Set therefore the unit of work of the parent route as stream cache unit of work, // if it is not already set. if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork()); @@ -959,9 +813,6 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat if (isParallelProcessing() && executorService == null) { throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); } - if (timeout > 0 && !isParallelProcessing()) { - throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); - } if (timeout > 0 && aggregateExecutorService == null) { // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run