This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 85c2f83ddade82d88d1c842f9747e77a8b8e2ade
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon May 4 10:05:06 2020 +0200

    CAMEL-14996: Splitter/Multicast EIP can cause a thread to starve from 
endless stackframes when splitting as it does not collapse its stackframes but 
keep scheduling for next split/task.
---
 .../apache/camel/processor/MulticastProcessor.java | 109 ++++++++++++++++++---
 .../processor/SplitterStreamingUoWIssueTest.java   |  11 ++-
 ...teParallelProcessingStackOverflowIssueTest.java |  50 ++++++++++
 .../SplitAggregateStackOverflowIssueTest.java      |  67 +++++++++++++
 ...itParallelProcessingStackOverflowIssueTest.java |  42 ++++++++
 .../aggregator/SplitStackOverflowIssueTest.java    |  41 ++++++++
 6 files changed, 304 insertions(+), 16 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 1a3c9de..f32ffa8 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -64,7 +64,6 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.KeyValueHolder;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AsyncCompletionService;
 import org.slf4j.Logger;
@@ -262,14 +261,15 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             return true;
         }
 
-        MulticastState state = new MulticastState(exchange, pairs, callback);
         if (isParallelProcessing()) {
-            executorService.submit(() -> reactiveExecutor.schedule(state));
+            MulticastParallelTask task = new MulticastParallelTask(exchange, 
pairs, callback);
+            executorService.submit(() -> reactiveExecutor.schedule(task));
         } else {
+            MulticastTask task = new MulticastTask(exchange, pairs, callback);
             if (exchange.isTransacted()) {
-                reactiveExecutor.scheduleSync(state);
+                reactiveExecutor.scheduleSync(task);
             } else {
-                reactiveExecutor.scheduleMain(state);
+                reactiveExecutor.scheduleMain(task);
             }
         }
 
@@ -287,7 +287,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         }
     }
 
-    protected class MulticastState implements Runnable {
+    protected class MulticastTask implements Runnable {
 
         final Exchange original;
         final Iterable<ProcessorExchangePair> pairs;
@@ -301,7 +301,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         final AtomicBoolean allSent = new AtomicBoolean();
         final AtomicBoolean done = new AtomicBoolean();
 
-        MulticastState(Exchange original, Iterable<ProcessorExchangePair> 
pairs, AsyncCallback callback) {
+        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> 
pairs, AsyncCallback callback) {
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
@@ -347,12 +347,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                 int index = nbExchangeSent.getAndIncrement();
                 updateNewExchange(exchange, index, pairs, hasNext);
 
-                // Schedule the processing of the next pair
-                if (hasNext) {
-                    if (isParallelProcessing()) {
-                        schedule(this);
-                    }
-                } else {
+                if (!hasNext) {
                     allSent.set(true);
                 }
 
@@ -387,8 +382,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                         // aggregate exchanges if any
                         aggregate();
 
-                        // next step
-                        if (hasNext && !isParallelProcessing()) {
+                        if (hasNext) {
                             schedule(this);
                         }
                     });
@@ -455,6 +449,91 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         }
     }
 
+    protected class MulticastParallelTask extends MulticastTask {
+
+        MulticastParallelTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+            super(original, pairs, callback);
+        }
+
+        @Override
+        public String toString() {
+            return "MulticastParallelTask";
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (done.get()) {
+                    return;
+                }
+
+                // Check if the iterator is empty
+                // This can happen the very first time we check the existence
+                // of an item before queuing the run.
+                // or some iterators may return true for hasNext() but then 
null in next()
+                if (!iterator.hasNext()) {
+                    doDone(result.get(), true);
+                    return;
+                }
+
+                ProcessorExchangePair pair = iterator.next();
+                boolean hasNext = iterator.hasNext();
+                // some iterators may return true for hasNext() but then null 
in next()
+                if (pair == null && !hasNext) {
+                    doDone(result.get(), true);
+                    return;
+                }
+
+                Exchange exchange = pair.getExchange();
+                int index = nbExchangeSent.getAndIncrement();
+                updateNewExchange(exchange, index, pairs, hasNext);
+
+                // Schedule the processing of the next pair
+                if (hasNext) {
+                    schedule(this);
+                } else {
+                    allSent.set(true);
+                }
+
+                completion.submit(exchangeResult -> {
+                    // compute time taken if sending to another endpoint
+                    StopWatch watch = beforeSend(pair);
+
+                    AsyncProcessor async = 
AsyncProcessorConverterHelper.convert(pair.getProcessor());
+                    async.process(exchange, doneSync -> {
+                        afterSend(pair, watch);
+
+                        // Decide whether to continue with the multicast or 
not; similar logic to the Pipeline
+                        // remember to test for stop on exception and 
aggregate before copying back results
+                        boolean continueProcessing = 
PipelineHelper.continueProcessing(exchange, "Multicast processing failed for 
number " + index, LOG);
+                        if (stopOnException && !continueProcessing) {
+                            if (exchange.getException() != null) {
+                                // wrap in exception to explain where it failed
+                                exchange.setException(new 
CamelExchangeException("Multicast processing failed for number " + index, 
exchange, exchange.getException()));
+                            } else {
+                                // we want to stop on exception, and the 
exception was handled by the error handler
+                                // this is similar to what the pipeline does, 
so we should do the same to not surprise end users
+                                // so we should set the failed exchange as the 
result and be done
+                                result.set(exchange);
+                            }
+                            // and do the done work
+                            doDone(exchange, true);
+                            return;
+                        }
+
+                        exchangeResult.accept(exchange);
+
+                        // aggregate exchanges if any
+                        aggregate();
+                    });
+                });
+            } catch (Exception e) {
+                original.setException(e);
+                doDone(null, false);
+            }
+        }
+    }
+
     protected void schedule(Executor executor, Runnable runnable, long delay, 
TimeUnit unit) {
         if (executor instanceof ScheduledExecutorService) {
             ((ScheduledExecutorService) executor).schedule(runnable, delay, 
unit);
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamingUoWIssueTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamingUoWIssueTest.java
index 603e9ce..2b4e1b6 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamingUoWIssueTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamingUoWIssueTest.java
@@ -38,6 +38,8 @@ public class SplitterStreamingUoWIssueTest extends 
ContextTestSupport {
 
         template.sendBodyAndHeader("file:target/data/splitter", "A,B,C,D,E", 
Exchange.FILE_NAME, "splitme.txt");
 
+        context.getRouteController().startAllRoutes();
+
         assertMockEndpointsSatisfied();
     }
 
@@ -49,6 +51,8 @@ public class SplitterStreamingUoWIssueTest extends 
ContextTestSupport {
         template.sendBodyAndHeader("file:target/data/splitter", "A,B,C,D,E", 
Exchange.FILE_NAME, "a.txt");
         template.sendBodyAndHeader("file:target/data/splitter", "F,G,H,I", 
Exchange.FILE_NAME, "b.txt");
 
+        context.getRouteController().startAllRoutes();
+
         assertMockEndpointsSatisfied();
     }
 
@@ -57,7 +61,12 @@ public class SplitterStreamingUoWIssueTest extends 
ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                
from("file:target/data/splitter?initialDelay=0&delay=10&delete=true&sortBy=file:name").split(body().tokenize(",")).streaming().to("seda:queue").end()
+                
from("file:target/data/splitter?initialDelay=0&delay=10&delete=true&sortBy=file:name").routeId("start").autoStartup(false)
+                    .log("Start of file ${file:name}")
+                    .split(body().tokenize(",")).streaming().
+                        process(e -> {
+                            
System.out.println(Thread.currentThread().getStackTrace().length);
+                        }).to("seda:queue").end()
                     .log("End of file ${file:name}").to("mock:result");
 
                 from("seda:queue").log("Token: ${body}").to("mock:foo");
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateParallelProcessingStackOverflowIssueTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateParallelProcessingStackOverflowIssueTest.java
new file mode 100644
index 0000000..3104685
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateParallelProcessingStackOverflowIssueTest.java
@@ -0,0 +1,50 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
+import org.junit.Test;
+
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.camel.Exchange.SPLIT_COMPLETE;
+
+public class SplitAggregateParallelProcessingStackOverflowIssueTest extends 
ContextTestSupport {
+
+    @Test
+    public void testStackoverflow() throws Exception {
+        int size = 50000;
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(size / 10);
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size; i++ ) {
+            sb.append("Line #" + i);
+            sb.append("\n");
+        }
+
+        template.sendBody("direct:start", sb);
+
+        MockEndpoint.assertIsSatisfied(60, SECONDS, result);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split().tokenize("\n").streaming().parallelProcessing()
+                        .aggregate(constant("foo"), new 
GroupedBodyAggregationStrategy())
+                        .completeAllOnStop()
+                        .eagerCheckCompletion()
+                        .completionSize(10)
+                        .completionTimeout(SECONDS.toMillis(5))
+                        .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+                            .to("log:result?groupSize=100", "mock:result");
+            }
+        };
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java
new file mode 100644
index 0000000..ad8098e
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java
@@ -0,0 +1,67 @@
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.camel.Exchange.SPLIT_COMPLETE;
+
+@Ignore("TODO: Fix this bug")
+public class SplitAggregateStackOverflowIssueTest extends ContextTestSupport {
+
+    private final AtomicInteger count = new AtomicInteger();
+
+    @Test
+    public void testStackoverflow() throws Exception {
+        int size = 50000;
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(size / 10);
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size; i++ ) {
+            sb.append("Line #" + i);
+            sb.append("\n");
+        }
+
+        template.sendBody("direct:start", sb);
+
+        MockEndpoint.assertIsSatisfied(60, SECONDS, result);
+
+        assertTrue("Stackframe must not be too high, was " + count.get(), 
count.get() < 50);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from("direct:start")
+                    .split().tokenize("\n").streaming()
+                        .process(e -> {
+                            if (e.getProperty(Exchange.SPLIT_INDEX, 0, 
int.class) % 100 == 0) {
+                                int frames = 
Thread.currentThread().getStackTrace().length;
+                                count.set(frames);
+                                log.info("Stackframe: {}", frames);
+                            }
+                        })
+                        .aggregate(constant("foo"), new 
GroupedBodyAggregationStrategy())
+                        .completeAllOnStop()
+                        .eagerCheckCompletion()
+                        .completionSize(10)
+                        .completionTimeout(SECONDS.toMillis(5))
+                        .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+                            .to("log:result?groupSize=100", "mock:result");
+            }
+        };
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitParallelProcessingStackOverflowIssueTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitParallelProcessingStackOverflowIssueTest.java
new file mode 100644
index 0000000..222e8be
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitParallelProcessingStackOverflowIssueTest.java
@@ -0,0 +1,42 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class SplitParallelProcessingStackOverflowIssueTest extends 
ContextTestSupport {
+
+    @Test
+    public void testStackoverflow() throws Exception {
+        int size = 50000;
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(size);
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size; i++ ) {
+            sb.append("Line #" + i);
+            sb.append("\n");
+        }
+
+        template.sendBody("direct:start", sb);
+
+        MockEndpoint.assertIsSatisfied(60, SECONDS, result);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split().tokenize("\n").streaming().parallelProcessing()
+                        .to("log:result?groupSize=100", "mock:result");
+            }
+        };
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitStackOverflowIssueTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitStackOverflowIssueTest.java
new file mode 100644
index 0000000..e85816d
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitStackOverflowIssueTest.java
@@ -0,0 +1,41 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class SplitStackOverflowIssueTest extends ContextTestSupport {
+
+    @Test
+    public void testStackoverflow() throws Exception {
+        int size = 50000;
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(size);
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size; i++ ) {
+            sb.append("Line #" + i);
+            sb.append("\n");
+        }
+
+        template.sendBody("direct:start", sb);
+
+        MockEndpoint.assertIsSatisfied(60, SECONDS, result);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split().tokenize("\n").streaming()
+                        .to("log:result?groupSize=100", "mock:result");
+            }
+        };
+    }
+}

Reply via email to