Repository: camel
Updated Branches:
  refs/heads/camel-2.14.x 0462bc13e -> 4125e9d86


Adding ability to REQUEUE a message on RabbitMQ server instead of REJECTING 
message / sending it to DLQ.

Conflicts:
        
components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/99d8831c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/99d8831c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/99d8831c

Branch: refs/heads/camel-2.14.x
Commit: 99d8831ce6921eb2cce816486550260235b3d524
Parents: 0462bc1
Author: Andrew Austin <andrew.aus...@wgu.edu>
Authored: Sat Feb 21 17:16:24 2015 -0700
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Feb 22 16:28:30 2015 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConstants.java   |   1 +
 .../component/rabbitmq/RabbitMQConsumer.java    |  10 +-
 .../rabbitmq/RabbitMQRequeueIntTest.java        | 111 +++++++++++++++++++
 3 files changed, 121 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/99d8831c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
index 383ac5d..b1f4a0c 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -34,6 +34,7 @@ public final class RabbitMQConstants {
     public static final String EXPIRATION = "rabbitmq.EXPIRATION";
     public static final String TIMESTAMP = "rabbitmq.TIMESTAMP";
     public static final String APP_ID = "rabbitmq.APP_ID";
+    public static final String REQUEUE = "rabbitmq.REQUEUE";
     public static final String RABBITMQ_DEAD_LETTER_EXCHANGE = 
"x-dead-letter-exchange";
     public static final String RABBITMQ_DEAD_LETTER_ROUTING_KEY = 
"x-dead-letter-routing-key";
     

http://git-wip-us.apache.org/repos/asf/camel/blob/99d8831c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index a8dd785..9badabd 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -28,6 +28,7 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
@@ -197,9 +198,16 @@ public class RabbitMQConsumer extends DefaultConsumer {
                     channel.basicAck(deliveryTag, false);
                 }
             } else {
+                Message msg = exchange.hasOut() ? exchange.getOut() : 
exchange.getIn();
+                boolean isRequeueHeaderSet = 
msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
                 // processing failed, then reject and handle the exception
                 if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
-                    channel.basicReject(deliveryTag, false);
+                    log.trace("Rejecting receipt [delivery_tag={}] with 
requeue={}", deliveryTag, isRequeueHeaderSet);
+                    if (isRequeueHeaderSet) {
+                        channel.basicReject(deliveryTag, true);
+                    } else {
+                        channel.basicReject(deliveryTag, false);
+                    }
                 }
                 if (exchange.getException() != null) {
                     getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());

http://git-wip-us.apache.org/repos/asf/camel/blob/99d8831c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
new file mode 100644
index 0000000..534f5b4
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQRequeueIntTest.java
@@ -0,0 +1,111 @@
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Integration test to confirm REQUEUE header causes message to be re-queued 
instead of sent to DLQ.
+ *
+ * Created by Andrew Austin on 2/21/15.
+ */
+public class RabbitMQRequeueIntTest extends CamelTestSupport {
+    public static final String ROUTING_KEY = "rk4";
+
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = 
"rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
+            + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "mock:producing")
+    private MockEndpoint producingMockEndpoint;
+
+    @EndpointInject(uri = "mock:consuming")
+    private MockEndpoint consumingMockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .log("Sending message")
+                        .inOnly(rabbitMQEndpoint)
+                        .to(producingMockEndpoint);
+                from(rabbitMQEndpoint)
+                        .id("consumingRoute")
+                        .log("Receiving message")
+                        .inOnly(consumingMockEndpoint)
+                        .choice()
+                        .when(body().isEqualTo("requeue header false"))
+                            .log("Setting REQUEUE flag to false")
+                            .setHeader(RabbitMQConstants.REQUEUE, 
constant(false))
+                        .when(body().isEqualTo("requeue header true"))
+                            .log("Setting REQUEUE flag to true")
+                            .setHeader(RabbitMQConstants.REQUEUE, 
constant(true))
+                        .when(body().isEqualTo("non-boolean header"))
+                                .log("Setting REQUEUE flag to non-boolean")
+                                .setHeader(RabbitMQConstants.REQUEUE, 
constant(4l))
+                        .end()
+                        .throwException(new Exception("Simulated exception"));
+            }
+        };
+    }
+
+    @Test
+    public void testNoRequeueHeaderCausesReject() throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.expectedMessageCount(1);
+
+        directProducer.sendBody("no requeue header");
+
+        Thread.sleep(100);
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testNonBooleanRequeueHeaderCausesReject() throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.expectedMessageCount(1);
+
+        directProducer.sendBody("non-boolean header");
+
+        Thread.sleep(100);
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testFalseRequeueHeaderCausesReject() throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.expectedMessageCount(1);
+
+        directProducer.sendBody("non-boolean header");
+
+        Thread.sleep(100);
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testTrueRequeueHeaderCausesRequeue() throws Exception {
+        producingMockEndpoint.expectedMessageCount(1);
+        consumingMockEndpoint.setMinimumExpectedMessageCount(2);
+
+        directProducer.sendBody("requeue header true");
+
+        Thread.sleep(100);
+        producingMockEndpoint.assertIsSatisfied();
+        consumingMockEndpoint.assertIsSatisfied();
+    }
+}

Reply via email to