Author: rgreig
Date: Tue Dec 12 05:22:45 2006
New Revision: 486132
URL: http://svn.apache.org/viewvc?view=rev&rev=486132
Log:
QPID-95 : Patch supplied by Rob Godfrey - throws correct exception when
encountering a non-routable mandatory message
Added:
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=486132&r1=486131&r2=486132
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
Tue Dec 12 05:22:45 2006
@@ -206,7 +206,18 @@
}
if (!delivered)
{
- _logger.warn("Exchange " + getName() + ": message not routable.");
+
+ String msg = "Exchange " + getName() + ": message not routable.";
+
+ if (payload.getPublishBody().mandatory)
+ {
+ throw new NoRouteException(msg, payload);
+ }
+ else
+ {
+ _logger.warn(msg);
+ }
+
}
}
Modified:
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=486132&r1=486131&r2=486132
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Tue Dec 12 05:22:45 2006
@@ -81,25 +81,45 @@
protected void routeAndTest(Message m, TestQueue... expected) throws
AMQException
{
- routeAndTest(m, Arrays.asList(expected));
+ routeAndTest(m, false, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, TestQueue...
expected) throws AMQException
+ {
+ routeAndTest(m, expectReturn, Arrays.asList(expected));
}
protected void routeAndTest(Message m, List<TestQueue> expected) throws
AMQException
{
- route(m);
- for (TestQueue q : queues)
+ routeAndTest(m, false, expected);
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn,
List<TestQueue> expected) throws AMQException
+ {
+ try
{
- if (expected.contains(q))
+ route(m);
+ assertFalse("Expected "+m+" to be returned due to manadatory flag,
and lack of routing",expectReturn);
+ for (TestQueue q : queues)
{
- assertTrue("Expected " + m + " to be delivered to " + q,
m.isInQueue(q));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered
to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " +
q, m.isInQueue(q));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be
delivered to " + q;
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q,
m.isInQueue(q));
+ //assert m.isInQueue(q) : "Expected " + m + " to be
delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to "
+ q, m.isInQueue(q));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be
delivered to " + q;
+ }
}
}
+
+ catch (NoRouteException ex)
+ {
+ assertTrue("Expected "+m+" not to be returned",expectReturn);
+ }
+
}
static FieldTable getHeaders(String... entries)
Modified:
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=486132&r1=486131&r2=486132
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
Tue Dec 12 05:22:45 2006
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
{
@@ -52,6 +53,19 @@
routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
q1, q2, q3, q4, q5, q6, q7, q8);
routeAndTest(new Message("Message6", "F0002"));
+
+ Message m7 = new Message("Message7", "XXXXX");
+
+ BasicPublishBody pb7 = m7.getPublishBody();
+ pb7.mandatory = true;
+ routeAndTest(m7,true);
+
+ Message m8 = new Message("Message8", "F0000");
+ BasicPublishBody pb8 = m8.getPublishBody();
+ pb8.mandatory = true;
+ routeAndTest(m8,false,q1);
+
+
}
public void testAny() throws AMQException
@@ -69,6 +83,20 @@
routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2,
q3, q4, q6);
routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
q1, q2, q3, q4, q6);
routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ public void testMandatory() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ Message m1 = new Message("Message1", "XXXXX");
+ Message m2 = new Message("Message2", "F0000");
+ BasicPublishBody pb1 = m1.getPublishBody();
+ pb1.mandatory = true;
+ BasicPublishBody pb2 = m1.getPublishBody();
+ pb2.mandatory = true;
+ routeAndTest(m1,true);
+
+
}
public static junit.framework.Test suite()
Added:
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=auto&rev=486132
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
(added)
+++
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
Tue Dec 12 05:22:45 2006
@@ -0,0 +1,151 @@
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.client.*;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+
+public class ReturnUnroutableMandatoryMessageTest extends TestCase implements
ExceptionListener
+{
+ private static final Logger _logger =
Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
+
+ private final List<Message> _bouncedMessageList =
Collections.synchronizedList(new ArrayList<Message>());
+
+ static
+ {
+ String workdir = System.getProperty("QPID_WORK");
+ if (workdir == null || workdir.equals(""))
+ {
+ String tempdir = System.getProperty("java.io.tmpdir");
+ System.out.println("QPID_WORK not set using tmp directory: " +
tempdir);
+ System.setProperty("QPID_WORK", tempdir);
+ }
+// DOMConfigurator.configure("../broker/etc/log4j.xml");
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * Tests that mandatory message which are not routable are returned to the
producer
+ *
+ * @throws Exception
+ */
+ public void testReturnUnroutableMandatoryMessage() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection("vm://:1", "guest", "guest",
"consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore)
ApplicationRegistry.getInstance().getMessageStore();
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+
+ AMQHeadersExchange queue = new AMQHeadersExchange(new
AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+
BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+ FieldTable ft = new PropertyFieldTable();
+ ft.setString("F1000","1");
+ MessageConsumer consumer = consumerSession.createConsumer(queue,
AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK,
false, false, (String)null, ft);
+
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession)
consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME,
ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest",
"producer1", "/test");
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced
messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer =
producerSession.createProducer(queue,false,false);
+ MessageProducer mandatoryProducer =
producerSession.createProducer(queue);
+
+
+ // First test - should neither be bounced nor routed
+ _logger.info("Sending non-routable non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+ // Third test - should be routed
+ _logger.info("Sending routable message");
+ TextMessage msg3 = producerSession.createTextMessage("msg3");
+ msg3.setStringProperty("F1000","1");
+ mandatoryProducer.send(msg3);
+
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver",tm != null);
+ assertTrue("Wrong message routed to receiver:
"+tm.getText(),"msg3".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch(InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1):
"+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced:
"+m.toString(),m.toString().contains("msg2"));
+
+
+
+
+ con.close();
+ con2.close();
+
+
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new
junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ _logger.warn("Caught exception on producer: ",jmsException);
+ Exception linkedException = jmsException.getLinkedException();
+ if(linkedException instanceof AMQNoRouteException)
+ {
+ AMQNoRouteException noRoute = (AMQNoRouteException)
linkedException;
+ Message bounced = (Message) noRoute.getUndeliveredMessage();
+ _bouncedMessageList.add(bounced);
+ }
+ }
+}