This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.0.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1560602faef08e1cf8944fe42160b7f864c5dbc2 Author: Robin Vanderhallen <rob...@aviovision.com> AuthorDate: Thu Dec 19 16:49:57 2019 +0100 CAMEL-14307: allow empty routing key when declaring RabbitMQ dead letter exchange --- .../component/rabbitmq/RabbitMQDeclareSupport.java | 5 +- .../RabbitMQDeadLetterRoutingKeyIntTest.java | 168 +++++++++++++++++++++ 2 files changed, 172 insertions(+), 1 deletion(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java index a805824..e2c3805 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java @@ -90,7 +90,10 @@ public class RabbitMQDeclareSupport { private void populateQueueArgumentsFromDeadLetterExchange(final Map<String, Object> queueArgs) { if (endpoint.getDeadLetterExchange() != null) { queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, endpoint.getDeadLetterExchange()); - queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey()); + + if (endpoint.getDeadLetterRoutingKey() != null) { + queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey()); + } } } diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java new file mode 100644 index 0000000..70fa772 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +public class RabbitMQDeadLetterRoutingKeyIntTest extends AbstractRabbitMQIntTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDeadLetterRoutingKeyIntTest.class); + + private static final String CONSUMER = "rabbitmq:ex9?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest" + + "&skipExchangeDeclare=false" + + "&skipQueueDeclare=false" + + "&autoDelete=false" + + "&durable=true" + + "&autoAck=false" + + "&queue=q9" + + "&routingKey=rk1" + + "&deadLetterExchange=dlx" + + "&deadLetterQueue=dlq" + + "&deadLetterExchangeType=fanout"; + + private static final String CONSUMER_WITH_DEADLETTER_ROUTING_KEY = "rabbitmq:ex10?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest" + + "&skipExchangeDeclare=false" + + "&skipQueueDeclare=false" + + "&autoDelete=false&durable=true" + + "&autoAck=false&queue=q10" + + "&routingKey=rk1" + + "&deadLetterExchange=dlx" + + "&deadLetterQueue=dlq" + + "&deadLetterExchangeType=fanout" + + "&deadLetterRoutingKey=rk2"; + + private Connection connection; + private Channel channel; + private Channel deadLetterChannel; + + @EndpointInject(uri = "mock:received") + private MockEndpoint receivedEndpoint; + + @Produce(uri = "direct:start") + private ProducerTemplate template; + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from(CONSUMER).to(receivedEndpoint); + from(CONSUMER_WITH_DEADLETTER_ROUTING_KEY).to(receivedEndpoint); + } + }; + } + + @Before + public void setUpRabbitMQ() throws Exception { + connection = connection(); + channel = connection.createChannel(); + deadLetterChannel = connection.createChannel(); + } + + @After + public void tearDownRabbitMQ() throws Exception { + channel.abort(); + deadLetterChannel.abort(); + connection.abort(); + } + + @Test + public void originalRoutingKeyIsReceived() throws IOException, InterruptedException { + final List<String> received = new ArrayList<>(); + final StringBuilder routingKey = new StringBuilder(); + + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() + .contentType("text/plain") + .contentEncoding(StandardCharsets.UTF_8.toString()).build(); + + receivedEndpoint.whenAnyExchangeReceived(exchange -> { + throw new Exception("Simulated exception"); + }); + + channel.basicPublish("ex9", "rk1", properties, "new message".getBytes(StandardCharsets.UTF_8)); + + deadLetterChannel.basicConsume("dlq", true, new DeadLetterRoutingKeyConsumer(received, routingKey)); + + Thread.sleep(500); + + assertListSize(received, 1); + assertEquals("rk1", routingKey.toString()); + } + + @Test + public void deadLetterRoutingKeyIsReceived() throws IOException, InterruptedException { + final List<String> received = new ArrayList<>(); + StringBuilder routingKey = new StringBuilder(); + + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() + .contentType("text/plain") + .contentEncoding(StandardCharsets.UTF_8.toString()).build(); + + receivedEndpoint.whenAnyExchangeReceived(exchange -> { + throw new Exception("Simulated exception"); + }); + + channel.basicPublish("ex10", "rk1", properties, "new message".getBytes(StandardCharsets.UTF_8)); + + deadLetterChannel.basicConsume("dlq", true, new DeadLetterRoutingKeyConsumer(received, routingKey)); + + Thread.sleep(500); + + assertListSize(received, 1); + assertEquals("rk2", routingKey.toString()); + } + + private class DeadLetterRoutingKeyConsumer extends DefaultConsumer { + private final StringBuilder routingKey; + private final List<String> received; + + DeadLetterRoutingKeyConsumer(final List<String> received, final StringBuilder routingKey) { + super(RabbitMQDeadLetterRoutingKeyIntTest.this.deadLetterChannel); + this.received = received; + this.routingKey = routingKey; + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + LOGGER.info("AMQP.BasicProperties: {}", properties); + + received.add(new String(body)); + routingKey.append(envelope.getRoutingKey()); + } + } +}