Repository: camel Updated Branches: refs/heads/camel-2.14.x 0462bc13e -> 4125e9d86
Adding ability to REQUEUE a message on RabbitMQ server instead of REJECTING message / sending it to DLQ. Conflicts: components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/99d8831c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/99d8831c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/99d8831c Branch: refs/heads/camel-2.14.x Commit: 99d8831ce6921eb2cce816486550260235b3d524 Parents: 0462bc1 Author: Andrew Austin <andrew.aus...@wgu.edu> Authored: Sat Feb 21 17:16:24 2015 -0700 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Feb 22 16:28:30 2015 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConstants.java | 1 + .../component/rabbitmq/RabbitMQConsumer.java | 10 +- .../rabbitmq/RabbitMQRequeueIntTest.java | 111 +++++++++++++++++++ 3 files changed, 121 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/99d8831c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java index 383ac5d..b1f4a0c 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java @@ -34,6 +34,7 @@ public final class RabbitMQConstants { public static final String EXPIRATION = "rabbitmq.EXPIRATION"; public static final String TIMESTAMP = "rabbitmq.TIMESTAMP"; public static final String APP_ID = "rabbitmq.APP_ID"; + public static final String REQUEUE = "rabbitmq.REQUEUE"; public static final String RABBITMQ_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange"; public static final String RABBITMQ_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; http://git-wip-us.apache.org/repos/asf/camel/blob/99d8831c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index a8dd785..9badabd 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -28,6 +28,7 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; @@ -197,9 +198,16 @@ public class RabbitMQConsumer extends DefaultConsumer { channel.basicAck(deliveryTag, false); } } else { + Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); // processing failed, then reject and handle the exception if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) { - channel.basicReject(deliveryTag, false); + log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", deliveryTag, isRequeueHeaderSet); + if (isRequeueHeaderSet) { + channel.basicReject(deliveryTag, true); + } else { + channel.basicReject(deliveryTag, false); + } } if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); http://git-wip-us.apache.org/repos/asf/camel/blob/99d8831c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java new file mode 100644 index 0000000..534f5b4 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java @@ -0,0 +1,111 @@ +package org.apache.camel.component.rabbitmq; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Integration test to confirm REQUEUE header causes message to be re-queued instead of sent to DLQ. + * + * Created by Andrew Austin on 2/21/15. + */ +public class RabbitMQRequeueIntTest extends CamelTestSupport { + public static final String ROUTING_KEY = "rk4"; + + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate directProducer; + + @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest" + + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY) + private Endpoint rabbitMQEndpoint; + + @EndpointInject(uri = "mock:producing") + private MockEndpoint producingMockEndpoint; + + @EndpointInject(uri = "mock:consuming") + private MockEndpoint consumingMockEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:rabbitMQ") + .id("producingRoute") + .log("Sending message") + .inOnly(rabbitMQEndpoint) + .to(producingMockEndpoint); + from(rabbitMQEndpoint) + .id("consumingRoute") + .log("Receiving message") + .inOnly(consumingMockEndpoint) + .choice() + .when(body().isEqualTo("requeue header false")) + .log("Setting REQUEUE flag to false") + .setHeader(RabbitMQConstants.REQUEUE, constant(false)) + .when(body().isEqualTo("requeue header true")) + .log("Setting REQUEUE flag to true") + .setHeader(RabbitMQConstants.REQUEUE, constant(true)) + .when(body().isEqualTo("non-boolean header")) + .log("Setting REQUEUE flag to non-boolean") + .setHeader(RabbitMQConstants.REQUEUE, constant(4l)) + .end() + .throwException(new Exception("Simulated exception")); + } + }; + } + + @Test + public void testNoRequeueHeaderCausesReject() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.expectedMessageCount(1); + + directProducer.sendBody("no requeue header"); + + Thread.sleep(100); + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } + + @Test + public void testNonBooleanRequeueHeaderCausesReject() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.expectedMessageCount(1); + + directProducer.sendBody("non-boolean header"); + + Thread.sleep(100); + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } + + @Test + public void testFalseRequeueHeaderCausesReject() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.expectedMessageCount(1); + + directProducer.sendBody("non-boolean header"); + + Thread.sleep(100); + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } + + @Test + public void testTrueRequeueHeaderCausesRequeue() throws Exception { + producingMockEndpoint.expectedMessageCount(1); + consumingMockEndpoint.setMinimumExpectedMessageCount(2); + + directProducer.sendBody("requeue header true"); + + Thread.sleep(100); + producingMockEndpoint.assertIsSatisfied(); + consumingMockEndpoint.assertIsSatisfied(); + } +}