This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit adb3c4b1496ddba907b095a41603858a3a7412af
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon May 4 17:24:39 2020 +0200

    CAMEL-14996: Okay after diving more into this then CAMEL-15008 fixed the 
root problem so we dont need to separate multicast eip into parallel vs 
sequential task to fix the endless stack frame. So reverting this code back and 
a little polish.
---
 .../apache/camel/processor/MulticastProcessor.java | 207 +++------------------
 1 file changed, 29 insertions(+), 178 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index d0b1e2c..7d542ae 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
@@ -177,9 +176,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                               ExecutorService executorService, boolean 
shutdownExecutorService, boolean streaming, boolean stopOnException, long 
timeout, Processor onPrepare,
                               boolean shareUnitOfWork, boolean 
parallelAggregate) {
         this(camelContext, route, processors, aggregationStrategy, 
parallelProcessing, executorService, shutdownExecutorService, streaming, 
stopOnException, timeout, onPrepare,
-             shareUnitOfWork, parallelAggregate, false);
+                shareUnitOfWork, parallelAggregate, false);
     }
-    
+
     public MulticastProcessor(CamelContext camelContext, Route route, 
Collection<Processor> processors, AggregationStrategy aggregationStrategy,
                               boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService, boolean streaming,
                               boolean stopOnException, long timeout, Processor 
onPrepare, boolean shareUnitOfWork,
@@ -260,15 +259,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             return true;
         }
 
+        MulticastTask state = new MulticastTask(exchange, pairs, callback);
         if (isParallelProcessing()) {
-            MulticastParallelTask task = new MulticastParallelTask(exchange, 
pairs, callback);
-            executorService.submit(() -> reactiveExecutor.schedule(task));
+            executorService.submit(() -> reactiveExecutor.schedule(state));
         } else {
-            MulticastTask task = new MulticastTask(exchange, pairs, callback);
             if (exchange.isTransacted()) {
-                reactiveExecutor.scheduleSync(task);
+                reactiveExecutor.scheduleSync(state);
             } else {
-                reactiveExecutor.scheduleMain(task);
+                reactiveExecutor.scheduleMain(state);
             }
         }
 
@@ -286,66 +284,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         }
     }
 
-    private interface MulticastCompletionService {
-
-        Exchange poll();
-
-        Exchange pollUnordered();
-
-        void submit(Consumer<Consumer<Exchange>> runner);
-
-    }
-
-    private class MulticastCompletionServiceParallelTask implements 
MulticastCompletionService {
-        private final AsyncCompletionService<Exchange> completion;
-
-        public MulticastCompletionServiceParallelTask(ReentrantLock lock) {
-            this.completion = new 
AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), 
lock);;
-        }
-
-        @Override
-        public Exchange poll() {
-            return completion.poll();
-        }
-
-        @Override
-        public Exchange pollUnordered() {
-            return completion.pollUnordered();
-        }
-
-        @Override
-        public void submit(Consumer<Consumer<Exchange>> runner) {
-            completion.submit(runner);
-        }
-    }
-
-    private class MulticastCompletionServiceTask implements 
MulticastCompletionService {
-
-        private final AtomicReference<Exchange> exchange = new 
AtomicReference<>();
-
-        public MulticastCompletionServiceTask() {
-        }
-
-        @Override
-        public Exchange poll() {
-            return exchange.getAndSet(null);
-        }
-
-        @Override
-        public Exchange pollUnordered() {
-            return exchange.getAndSet(null);
-        }
-
-        @Override
-        public void submit(Consumer<Consumer<Exchange>> runner) {
-            runner.accept(this::setResult);
-        }
-
-        private void setResult(Exchange result) {
-            this.exchange.set(result);
-        }
-    }
-
     protected class MulticastTask implements Runnable {
 
         final Exchange original;
@@ -353,7 +291,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         final AsyncCallback callback;
         final Iterator<ProcessorExchangePair> iterator;
         final ReentrantLock lock;
-        MulticastCompletionService completion;
+        final AsyncCompletionService<Exchange> completion;
         final AtomicReference<Exchange> result;
         final AtomicInteger nbExchangeSent = new AtomicInteger();
         final AtomicInteger nbAggregated = new AtomicInteger();
@@ -366,8 +304,11 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             this.callback = callback;
             this.iterator = pairs.iterator();
             this.lock = new ReentrantLock();
-            this.completion = new MulticastCompletionServiceTask();
+            this.completion = new 
AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), 
lock);
             this.result = new AtomicReference<>();
+            if (timeout > 0) {
+                schedule(aggregateExecutorService, this::timeout, timeout, 
TimeUnit.MILLISECONDS);
+            }
         }
 
         @Override
@@ -375,10 +316,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             return "MulticastTask";
         }
 
-        private Exchange completionPoll() {
-            return completion.poll();
-        }
-
         @Override
         public void run() {
             try {
@@ -407,7 +344,12 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                 int index = nbExchangeSent.getAndIncrement();
                 updateNewExchange(exchange, index, pairs, hasNext);
 
-                if (!hasNext) {
+                // Schedule the processing of the next pair
+                if (hasNext) {
+                    if (isParallelProcessing()) {
+                        schedule(this);
+                    }
+                } else {
                     allSent.set(true);
                 }
 
@@ -442,7 +384,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                         // aggregate exchanges if any
                         aggregate();
 
-                        if (hasNext) {
+                        // next step
+                        if (hasNext && !isParallelProcessing()) {
                             schedule(this);
                         }
                     });
@@ -458,7 +401,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             if (lock.tryLock()) {
                 try {
                     Exchange exchange;
-                    while (!done.get() && (exchange = completionPoll()) != 
null) {
+                    while (!done.get() && (exchange = completion.poll()) != 
null) {
                         doAggregate(result, exchange, original);
                         if (nbAggregated.incrementAndGet() >= 
nbExchangeSent.get() && allSent.get()) {
                             doDone(result.get(), true);
@@ -474,101 +417,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             }
         }
 
-        protected void doDone(Exchange exchange, boolean forceExhaust) {
-            if (done.compareAndSet(false, true)) {
-                MulticastProcessor.this.doDone(original, exchange, pairs, 
callback, false, forceExhaust);
-            }
-        }
-    }
-
-    protected class MulticastParallelTask extends MulticastTask {
-
-        MulticastParallelTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
-            super(original, pairs, callback);
-            this.completion = new MulticastCompletionServiceParallelTask(lock);
-            if (timeout > 0) {
-                schedule(aggregateExecutorService, this::timeout, timeout, 
TimeUnit.MILLISECONDS);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "MulticastParallelTask";
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (done.get()) {
-                    return;
-                }
-
-                // Check if the iterator is empty
-                // This can happen the very first time we check the existence
-                // of an item before queuing the run.
-                // or some iterators may return true for hasNext() but then 
null in next()
-                if (!iterator.hasNext()) {
-                    doDone(result.get(), true);
-                    return;
-                }
-
-                ProcessorExchangePair pair = iterator.next();
-                boolean hasNext = iterator.hasNext();
-                // some iterators may return true for hasNext() but then null 
in next()
-                if (pair == null && !hasNext) {
-                    doDone(result.get(), true);
-                    return;
-                }
-
-                Exchange exchange = pair.getExchange();
-                int index = nbExchangeSent.getAndIncrement();
-                updateNewExchange(exchange, index, pairs, hasNext);
-
-                // Schedule the processing of the next pair
-                if (hasNext) {
-                    schedule(this);
-                } else {
-                    allSent.set(true);
-                }
-
-                completion.submit(exchangeResult -> {
-                    // compute time taken if sending to another endpoint
-                    StopWatch watch = beforeSend(pair);
-
-                    AsyncProcessor async = 
AsyncProcessorConverterHelper.convert(pair.getProcessor());
-                    async.process(exchange, doneSync -> {
-                        afterSend(pair, watch);
-
-                        // Decide whether to continue with the multicast or 
not; similar logic to the Pipeline
-                        // remember to test for stop on exception and 
aggregate before copying back results
-                        boolean continueProcessing = 
PipelineHelper.continueProcessing(exchange, "Multicast processing failed for 
number " + index, LOG);
-                        if (stopOnException && !continueProcessing) {
-                            if (exchange.getException() != null) {
-                                // wrap in exception to explain where it failed
-                                exchange.setException(new 
CamelExchangeException("Multicast processing failed for number " + index, 
exchange, exchange.getException()));
-                            } else {
-                                // we want to stop on exception, and the 
exception was handled by the error handler
-                                // this is similar to what the pipeline does, 
so we should do the same to not surprise end users
-                                // so we should set the failed exchange as the 
result and be done
-                                result.set(exchange);
-                            }
-                            // and do the done work
-                            doDone(exchange, true);
-                            return;
-                        }
-
-                        exchangeResult.accept(exchange);
-
-                        // aggregate exchanges if any
-                        aggregate();
-                    });
-                });
-            } catch (Exception e) {
-                original.setException(e);
-                doDone(null, false);
-            }
-        }
-
         protected void timeout() {
             Lock lock = this.lock;
             if (lock.tryLock()) {
@@ -596,6 +444,12 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                 }
             }
         }
+
+        protected void doDone(Exchange exchange, boolean forceExhaust) {
+            if (done.compareAndSet(false, true)) {
+                MulticastProcessor.this.doDone(original, exchange, pairs, 
callback, false, forceExhaust);
+            }
+        }
     }
 
     protected void schedule(Executor executor, Runnable runnable, long delay, 
TimeUnit unit) {
@@ -804,10 +658,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             }
 
             // If the multi-cast processor has an aggregation strategy
-            // then the StreamCache created by the child routes must not be 
-            // closed by the unit of work of the child route, but by the unit 
of 
+            // then the StreamCache created by the child routes must not be
+            // closed by the unit of work of the child route, but by the unit 
of
             // work of the parent route or grand parent route or grand grand 
parent route ...(in case of nesting).
-            // Set therefore the unit of work of the  parent route as stream 
cache unit of work, 
+            // Set therefore the unit of work of the  parent route as stream 
cache unit of work,
             // if it is not already set.
             if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
                 copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, 
exchange.getUnitOfWork());
@@ -959,9 +813,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         if (isParallelProcessing() && executorService == null) {
             throw new IllegalArgumentException("ParallelProcessing is enabled 
but ExecutorService has not been set");
         }
-        if (timeout > 0 && !isParallelProcessing()) {
-            throw new IllegalArgumentException("Timeout is used but 
ParallelProcessing has not been enabled");
-        }
         if (timeout > 0 && aggregateExecutorService == null) {
             // use unbounded thread pool so we ensure the aggregate on-the-fly 
task always will have assigned a thread
             // and run the tasks when the task is submitted. If not then the 
aggregate task may not be able to run

Reply via email to