This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.21.x by this push: new 9fbb509 CAMEL-12446: Splitter - Make it easier to turn off propgate exception 9fbb509 is described below commit 9fbb5095ef99f9b4723216a954059b6c7102ae4b Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Apr 16 11:41:34 2018 +0200 CAMEL-12446: Splitter - Make it easier to turn off propgate exception --- .../java/org/apache/camel/processor/Splitter.java | 8 ++ .../aggregate/UseOriginalAggregationStrategy.java | 10 ++- .../camel/util/toolbox/AggregationStrategies.java | 11 +++ .../org/apache/camel/processor/SplitterTest.java | 1 + ...litterUseOriginalNotPropagateExceptionTest.java | 92 ++++++++++++++++++++++ 5 files changed, 121 insertions(+), 1 deletion(-) diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java index 8a06f79..fb56485 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java @@ -100,6 +100,14 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac public boolean process(Exchange exchange, final AsyncCallback callback) { final AggregationStrategy strategy = getAggregationStrategy(); + // set original exchange if not already pre-configured + if (strategy instanceof UseOriginalAggregationStrategy) { + UseOriginalAggregationStrategy original = (UseOriginalAggregationStrategy) strategy; + if (original.getOriginal() == null) { + original.setOriginal(exchange); + } + } + // if no custom aggregation strategy is being used then fallback to keep the original // and propagate exceptions which is done by a per exchange specific aggregation strategy // to ensure it supports async routing diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java index 764151f..dbd60c5 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java @@ -28,7 +28,7 @@ import org.apache.camel.Exchange; */ public class UseOriginalAggregationStrategy implements AggregationStrategy { - private final Exchange original; + private Exchange original; private final boolean propagateException; public UseOriginalAggregationStrategy() { @@ -64,6 +64,14 @@ public class UseOriginalAggregationStrategy implements AggregationStrategy { } } + public Exchange getOriginal() { + return original; + } + + public void setOriginal(Exchange original) { + this.original = original; + } + @Override public String toString() { return "UseOriginalAggregationStrategy"; diff --git a/camel-core/src/main/java/org/apache/camel/util/toolbox/AggregationStrategies.java b/camel-core/src/main/java/org/apache/camel/util/toolbox/AggregationStrategies.java index a601598..ac68858 100644 --- a/camel-core/src/main/java/org/apache/camel/util/toolbox/AggregationStrategies.java +++ b/camel-core/src/main/java/org/apache/camel/util/toolbox/AggregationStrategies.java @@ -70,6 +70,17 @@ public final class AggregationStrategies { } /** + * Use the original exchange. + * + * @param propagateException whether to propgate exception if errors was thrown during processing splitted messages. + * + * @see org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy + */ + public static AggregationStrategy useOriginal(boolean propagateException) { + return new UseOriginalAggregationStrategy(null, propagateException); + } + + /** * Creates a {@link GroupedExchangeAggregationStrategy} aggregation strategy. */ public static AggregationStrategy groupedExchange() { diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java index d84c55e..5a2fab3 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java @@ -32,6 +32,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; /** diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterUseOriginalNotPropagateExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterUseOriginalNotPropagateExceptionTest.java new file mode 100644 index 0000000..ca03096 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterUseOriginalNotPropagateExceptionTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.EventObject; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.management.event.ExchangeFailedEvent; +import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.util.toolbox.AggregationStrategies; + +public class SplitterUseOriginalNotPropagateExceptionTest extends ContextTestSupport { + + private MyEventNotifier notifier = new MyEventNotifier(); + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().addEventNotifier(notifier); + return context; + } + + public void testUseOriginalNotPropgateException() throws Exception { + assertEquals(0, notifier.getErrors()); + + getMockEndpoint("mock:line").expectedBodiesReceived("Hello", "World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Hello,Kaboom,World"); + + try { + template.sendBody("direct:start", "Hello,Kaboom,World"); + } catch (Exception e) { + fail("Should not fail"); + } + + assertMockEndpointsSatisfied(); + + // there should only be 1 error as we do not propagate errors to the parent + assertEquals(1, notifier.getErrors()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .split(body()).aggregationStrategy(AggregationStrategies.useOriginal(false)) + .filter(simple("${body} == 'Kaboom'")) + .throwException(new IllegalArgumentException("Forced error")) + .end() + .to("mock:line") + .end() + .to("mock:result"); + } + }; + } + + private static class MyEventNotifier extends EventNotifierSupport { + + private int errors; + + @Override + public void notify(EventObject event) throws Exception { + errors++; + } + + @Override + public boolean isEnabled(EventObject event) { + return event instanceof ExchangeFailedEvent; + } + + public int getErrors() { + return errors; + } + } +} -- To stop receiving notification emails like this one, please contact davscl...@apache.org.