This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 85c2f83ddade82d88d1c842f9747e77a8b8e2ade Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon May 4 10:05:06 2020 +0200 CAMEL-14996: Splitter/Multicast EIP can cause a thread to starve from endless stackframes when splitting as it does not collapse its stackframes but keep scheduling for next split/task. --- .../apache/camel/processor/MulticastProcessor.java | 109 ++++++++++++++++++--- .../processor/SplitterStreamingUoWIssueTest.java | 11 ++- ...teParallelProcessingStackOverflowIssueTest.java | 50 ++++++++++ .../SplitAggregateStackOverflowIssueTest.java | 67 +++++++++++++ ...itParallelProcessingStackOverflowIssueTest.java | 42 ++++++++ .../aggregator/SplitStackOverflowIssueTest.java | 41 ++++++++ 6 files changed, 304 insertions(+), 16 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 1a3c9de..f32ffa8 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -64,7 +64,6 @@ import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CastUtils; import org.apache.camel.util.IOHelper; import org.apache.camel.util.KeyValueHolder; -import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; import org.apache.camel.util.concurrent.AsyncCompletionService; import org.slf4j.Logger; @@ -262,14 +261,15 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat return true; } - MulticastState state = new MulticastState(exchange, pairs, callback); if (isParallelProcessing()) { - executorService.submit(() -> reactiveExecutor.schedule(state)); + MulticastParallelTask task = new MulticastParallelTask(exchange, pairs, callback); + executorService.submit(() -> reactiveExecutor.schedule(task)); } else { + MulticastTask task = new MulticastTask(exchange, pairs, callback); if (exchange.isTransacted()) { - reactiveExecutor.scheduleSync(state); + reactiveExecutor.scheduleSync(task); } else { - reactiveExecutor.scheduleMain(state); + reactiveExecutor.scheduleMain(task); } } @@ -287,7 +287,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } } - protected class MulticastState implements Runnable { + protected class MulticastTask implements Runnable { final Exchange original; final Iterable<ProcessorExchangePair> pairs; @@ -301,7 +301,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat final AtomicBoolean allSent = new AtomicBoolean(); final AtomicBoolean done = new AtomicBoolean(); - MulticastState(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { + MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { this.original = original; this.pairs = pairs; this.callback = callback; @@ -347,12 +347,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat int index = nbExchangeSent.getAndIncrement(); updateNewExchange(exchange, index, pairs, hasNext); - // Schedule the processing of the next pair - if (hasNext) { - if (isParallelProcessing()) { - schedule(this); - } - } else { + if (!hasNext) { allSent.set(true); } @@ -387,8 +382,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat // aggregate exchanges if any aggregate(); - // next step - if (hasNext && !isParallelProcessing()) { + if (hasNext) { schedule(this); } }); @@ -455,6 +449,91 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } } + protected class MulticastParallelTask extends MulticastTask { + + MulticastParallelTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { + super(original, pairs, callback); + } + + @Override + public String toString() { + return "MulticastParallelTask"; + } + + @Override + public void run() { + try { + if (done.get()) { + return; + } + + // Check if the iterator is empty + // This can happen the very first time we check the existence + // of an item before queuing the run. + // or some iterators may return true for hasNext() but then null in next() + if (!iterator.hasNext()) { + doDone(result.get(), true); + return; + } + + ProcessorExchangePair pair = iterator.next(); + boolean hasNext = iterator.hasNext(); + // some iterators may return true for hasNext() but then null in next() + if (pair == null && !hasNext) { + doDone(result.get(), true); + return; + } + + Exchange exchange = pair.getExchange(); + int index = nbExchangeSent.getAndIncrement(); + updateNewExchange(exchange, index, pairs, hasNext); + + // Schedule the processing of the next pair + if (hasNext) { + schedule(this); + } else { + allSent.set(true); + } + + completion.submit(exchangeResult -> { + // compute time taken if sending to another endpoint + StopWatch watch = beforeSend(pair); + + AsyncProcessor async = AsyncProcessorConverterHelper.convert(pair.getProcessor()); + async.process(exchange, doneSync -> { + afterSend(pair, watch); + + // Decide whether to continue with the multicast or not; similar logic to the Pipeline + // remember to test for stop on exception and aggregate before copying back results + boolean continueProcessing = PipelineHelper.continueProcessing(exchange, "Multicast processing failed for number " + index, LOG); + if (stopOnException && !continueProcessing) { + if (exchange.getException() != null) { + // wrap in exception to explain where it failed + exchange.setException(new CamelExchangeException("Multicast processing failed for number " + index, exchange, exchange.getException())); + } else { + // we want to stop on exception, and the exception was handled by the error handler + // this is similar to what the pipeline does, so we should do the same to not surprise end users + // so we should set the failed exchange as the result and be done + result.set(exchange); + } + // and do the done work + doDone(exchange, true); + return; + } + + exchangeResult.accept(exchange); + + // aggregate exchanges if any + aggregate(); + }); + }); + } catch (Exception e) { + original.setException(e); + doDone(null, false); + } + } + } + protected void schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) { if (executor instanceof ScheduledExecutorService) { ((ScheduledExecutorService) executor).schedule(runnable, delay, unit); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamingUoWIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamingUoWIssueTest.java index 603e9ce..2b4e1b6 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamingUoWIssueTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamingUoWIssueTest.java @@ -38,6 +38,8 @@ public class SplitterStreamingUoWIssueTest extends ContextTestSupport { template.sendBodyAndHeader("file:target/data/splitter", "A,B,C,D,E", Exchange.FILE_NAME, "splitme.txt"); + context.getRouteController().startAllRoutes(); + assertMockEndpointsSatisfied(); } @@ -49,6 +51,8 @@ public class SplitterStreamingUoWIssueTest extends ContextTestSupport { template.sendBodyAndHeader("file:target/data/splitter", "A,B,C,D,E", Exchange.FILE_NAME, "a.txt"); template.sendBodyAndHeader("file:target/data/splitter", "F,G,H,I", Exchange.FILE_NAME, "b.txt"); + context.getRouteController().startAllRoutes(); + assertMockEndpointsSatisfied(); } @@ -57,7 +61,12 @@ public class SplitterStreamingUoWIssueTest extends ContextTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("file:target/data/splitter?initialDelay=0&delay=10&delete=true&sortBy=file:name").split(body().tokenize(",")).streaming().to("seda:queue").end() + from("file:target/data/splitter?initialDelay=0&delay=10&delete=true&sortBy=file:name").routeId("start").autoStartup(false) + .log("Start of file ${file:name}") + .split(body().tokenize(",")).streaming(). + process(e -> { + System.out.println(Thread.currentThread().getStackTrace().length); + }).to("seda:queue").end() .log("End of file ${file:name}").to("mock:result"); from("seda:queue").log("Token: ${body}").to("mock:foo"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateParallelProcessingStackOverflowIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateParallelProcessingStackOverflowIssueTest.java new file mode 100644 index 0000000..3104685 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateParallelProcessingStackOverflowIssueTest.java @@ -0,0 +1,50 @@ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy; +import org.junit.Test; + + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.camel.Exchange.SPLIT_COMPLETE; + +public class SplitAggregateParallelProcessingStackOverflowIssueTest extends ContextTestSupport { + + @Test + public void testStackoverflow() throws Exception { + int size = 50000; + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(size / 10); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size; i++ ) { + sb.append("Line #" + i); + sb.append("\n"); + } + + template.sendBody("direct:start", sb); + + MockEndpoint.assertIsSatisfied(60, SECONDS, result); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .split().tokenize("\n").streaming().parallelProcessing() + .aggregate(constant("foo"), new GroupedBodyAggregationStrategy()) + .completeAllOnStop() + .eagerCheckCompletion() + .completionSize(10) + .completionTimeout(SECONDS.toMillis(5)) + .completionPredicate(exchangeProperty(SPLIT_COMPLETE)) + .to("log:result?groupSize=100", "mock:result"); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java new file mode 100644 index 0000000..ad8098e --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateStackOverflowIssueTest.java @@ -0,0 +1,67 @@ +package org.apache.camel.processor.aggregator; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy; +import org.junit.Ignore; +import org.junit.Test; + + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.camel.Exchange.SPLIT_COMPLETE; + +@Ignore("TODO: Fix this bug") +public class SplitAggregateStackOverflowIssueTest extends ContextTestSupport { + + private final AtomicInteger count = new AtomicInteger(); + + @Test + public void testStackoverflow() throws Exception { + int size = 50000; + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(size / 10); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size; i++ ) { + sb.append("Line #" + i); + sb.append("\n"); + } + + template.sendBody("direct:start", sb); + + MockEndpoint.assertIsSatisfied(60, SECONDS, result); + + assertTrue("Stackframe must not be too high, was " + count.get(), count.get() < 50); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + from("direct:start") + .split().tokenize("\n").streaming() + .process(e -> { + if (e.getProperty(Exchange.SPLIT_INDEX, 0, int.class) % 100 == 0) { + int frames = Thread.currentThread().getStackTrace().length; + count.set(frames); + log.info("Stackframe: {}", frames); + } + }) + .aggregate(constant("foo"), new GroupedBodyAggregationStrategy()) + .completeAllOnStop() + .eagerCheckCompletion() + .completionSize(10) + .completionTimeout(SECONDS.toMillis(5)) + .completionPredicate(exchangeProperty(SPLIT_COMPLETE)) + .to("log:result?groupSize=100", "mock:result"); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitParallelProcessingStackOverflowIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitParallelProcessingStackOverflowIssueTest.java new file mode 100644 index 0000000..222e8be --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitParallelProcessingStackOverflowIssueTest.java @@ -0,0 +1,42 @@ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + + +import static java.util.concurrent.TimeUnit.SECONDS; + +public class SplitParallelProcessingStackOverflowIssueTest extends ContextTestSupport { + + @Test + public void testStackoverflow() throws Exception { + int size = 50000; + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(size); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size; i++ ) { + sb.append("Line #" + i); + sb.append("\n"); + } + + template.sendBody("direct:start", sb); + + MockEndpoint.assertIsSatisfied(60, SECONDS, result); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .split().tokenize("\n").streaming().parallelProcessing() + .to("log:result?groupSize=100", "mock:result"); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitStackOverflowIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitStackOverflowIssueTest.java new file mode 100644 index 0000000..e85816d --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitStackOverflowIssueTest.java @@ -0,0 +1,41 @@ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; + +public class SplitStackOverflowIssueTest extends ContextTestSupport { + + @Test + public void testStackoverflow() throws Exception { + int size = 50000; + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(size); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size; i++ ) { + sb.append("Line #" + i); + sb.append("\n"); + } + + template.sendBody("direct:start", sb); + + MockEndpoint.assertIsSatisfied(60, SECONDS, result); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .split().tokenize("\n").streaming() + .to("log:result?groupSize=100", "mock:result"); + } + }; + } +}