This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch var-fail in repository https://gitbox.apache.org/repos/asf/camel.git
commit 87d35c17db647fc97e9e2d6b9637eaa3741c3c6f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Mar 23 13:26:05 2024 +0100 CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully --- .../java/org/apache/camel/processor/Enricher.java | 10 +- .../org/apache/camel/processor/PollEnricher.java | 9 +- .../camel/processor/EnrichVariableErrorTest.java | 291 +++++++++++++++++++++ .../processor/PollEnrichVariableErrorTest.java | 112 ++++++++ 4 files changed, 413 insertions(+), 9 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java index 97d757a5f99..9424fe89a61 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java @@ -222,12 +222,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); if (aggregatedExchange != null) { - if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) { + if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) { // result should be stored in variable instead of message body - ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, - exchange.getMessage()); - exchange.getMessage().setBody(originalBody); - exchange.getMessage().setHeaders(originalHeaders); + ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive, + aggregatedExchange.getMessage()); + aggregatedExchange.getMessage().setBody(originalBody); + aggregatedExchange.getMessage().setHeaders(originalHeaders); } // copy aggregation result onto original exchange (preserving pattern) copyResultsWithoutCorrelationId(exchange, aggregatedExchange); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index f6cac002f3d..e13ec57ed88 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -350,11 +350,12 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // must catch any exception from aggregation Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); if (aggregatedExchange != null) { - if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) { + if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) { // result should be stored in variable instead of message body - ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage()); - exchange.getMessage().setBody(originalBody); - exchange.getMessage().setHeaders(originalHeaders); + ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive, + aggregatedExchange.getMessage()); + aggregatedExchange.getMessage().setBody(originalBody); + aggregatedExchange.getMessage().setHeaders(originalHeaders); } // copy aggregation result onto original exchange (preserving pattern) copyResultsPreservePattern(exchange, aggregatedExchange); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java new file mode 100644 index 00000000000..3c199057eae --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class EnrichVariableErrorTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testThrowException() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + // TODO: should this be World or Bye World? + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testTryCatch() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .doTry() + .throwException(new IllegalArgumentException("Forced")) + .doCatch(Exception.class) + .setBody(simple("Catch: ${body}")) + .end(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(1); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertTrue(out.hasVariables()); + Assertions.assertEquals("World", out.getMessage().getBody()); + Assertions.assertEquals("Catch: Bye World", out.getVariable("bye")); + assertMockEndpointsSatisfied(); + } + + @Test + public void testOnExceptionHandled() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .handled(true) + .setBody(simple("Error: ${body}")); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Error: Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testOnExceptionNotHandled() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .handled(false) + .setBody(simple("Error: ${body}")); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Error: Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testDeadLetterChannel() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testDefaultErrorHandler() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(defaultErrorHandler()); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testStop() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .stop(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testRollback() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .rollback(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testMarkRollbackLast() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .markRollbackOnly(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testMarkRollbackOnlyLast() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .markRollbackOnlyLast(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java new file mode 100644 index 00000000000..c1dd22ff715 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.seda.SedaEndpoint; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PollEnrichVariableErrorTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testThrowException() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye") + .to("mock:result"); + } + }); + context.start(); + + template.send("seda:foo", e -> { + e.getMessage().setBody("Bye World"); + e.setException(new IllegalArgumentException()); + }); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testStop() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye") + .to("mock:result"); + } + }); + context.start(); + + SedaEndpoint se = context.getEndpoint("seda:foo", SedaEndpoint.class); + Exchange ex = se.createExchange(); + ex.getMessage().setBody("Bye World"); + ex.setRouteStop(true); + se.getQueue().add(ex); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertTrue(out.isRouteStop()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testRollbackOnly() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye") + .to("mock:result"); + } + }); + context.start(); + + SedaEndpoint se = context.getEndpoint("seda:foo", SedaEndpoint.class); + Exchange ex = se.createExchange(); + ex.getMessage().setBody("Bye World"); + ex.setRollbackOnly(true); + se.getQueue().add(ex); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertTrue(out.isRollbackOnly()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + +}