Author: rgreig
Date: Tue Dec 12 05:22:45 2006
New Revision: 486132

URL: http://svn.apache.org/viewvc?view=rev&rev=486132
Log:
QPID-95 : Patch supplied by Rob Godfrey - throws correct exception when 
encountering a non-routable mandatory message

Added:
    
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=486132&r1=486131&r2=486132
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
 Tue Dec 12 05:22:45 2006
@@ -206,7 +206,18 @@
         }
         if (!delivered)
         {
-            _logger.warn("Exchange " + getName() + ": message not routable.");
+
+            String msg = "Exchange " + getName() + ": message not routable.";
+
+            if (payload.getPublishBody().mandatory)
+            {
+                throw new NoRouteException(msg, payload);
+            }
+            else
+            {
+                _logger.warn(msg);
+            }
+
         }
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=486132&r1=486131&r2=486132
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 Tue Dec 12 05:22:45 2006
@@ -81,25 +81,45 @@
 
     protected void routeAndTest(Message m, TestQueue... expected) throws 
AMQException
     {
-        routeAndTest(m, Arrays.asList(expected));
+        routeAndTest(m, false, Arrays.asList(expected));
+    }
+
+    protected void routeAndTest(Message m, boolean expectReturn, TestQueue... 
expected) throws AMQException
+    {
+        routeAndTest(m, expectReturn, Arrays.asList(expected));
     }
 
     protected void routeAndTest(Message m, List<TestQueue> expected) throws 
AMQException
     {
-        route(m);
-        for (TestQueue q : queues)
+        routeAndTest(m, false, expected);
+    }
+
+    protected void routeAndTest(Message m, boolean expectReturn, 
List<TestQueue> expected) throws AMQException
+    {
+        try
         {
-            if (expected.contains(q))
+            route(m);
+            assertFalse("Expected "+m+" to be returned due to manadatory flag, 
and lack of routing",expectReturn);
+            for (TestQueue q : queues)
             {
-                assertTrue("Expected " + m + " to be delivered to " + q, 
m.isInQueue(q));
-                //assert m.isInQueue(q) : "Expected " + m + " to be delivered 
to " + q;
-            }
-            else
-            {
-                assertFalse("Did not expect " + m + " to be delivered to " + 
q, m.isInQueue(q));
-                //assert !m.isInQueue(q) : "Did not expect " + m + " to be 
delivered to " + q;
+                if (expected.contains(q))
+                {
+                    assertTrue("Expected " + m + " to be delivered to " + q, 
m.isInQueue(q));
+                    //assert m.isInQueue(q) : "Expected " + m + " to be 
delivered to " + q;
+                }
+                else
+                {
+                    assertFalse("Did not expect " + m + " to be delivered to " 
+ q, m.isInQueue(q));
+                    //assert !m.isInQueue(q) : "Did not expect " + m + " to be 
delivered to " + q;
+                }
             }
         }
+
+        catch (NoRouteException ex)
+        {
+            assertTrue("Expected "+m+" not to be returned",expectReturn);
+        }
+
     }
 
     static FieldTable getHeaders(String... entries)

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=486132&r1=486131&r2=486132
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
 Tue Dec 12 05:22:45 2006
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
 
 public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
 {
@@ -52,6 +53,19 @@
         routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
                      q1, q2, q3, q4, q5, q6, q7, q8);
         routeAndTest(new Message("Message6", "F0002"));
+
+        Message m7 = new Message("Message7", "XXXXX");
+
+        BasicPublishBody pb7 = m7.getPublishBody();
+        pb7.mandatory = true;
+        routeAndTest(m7,true);
+
+        Message m8 = new Message("Message8", "F0000");
+        BasicPublishBody pb8 = m8.getPublishBody();
+        pb8.mandatory = true;
+        routeAndTest(m8,false,q1);
+
+
     }
 
     public void testAny() throws AMQException
@@ -69,6 +83,20 @@
         routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, 
q3, q4, q6);
         routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), 
q1, q2, q3, q4, q6);
         routeAndTest(new Message("Message6", "F0002"));
+    }
+
+    public void testMandatory() throws AMQException
+    {
+        TestQueue q1 = bindDefault("F0000");
+        Message m1 = new Message("Message1", "XXXXX");
+        Message m2 = new Message("Message2", "F0000");
+        BasicPublishBody pb1 = m1.getPublishBody();
+        pb1.mandatory = true;
+        BasicPublishBody pb2 = m1.getPublishBody();
+        pb2.mandatory = true;
+        routeAndTest(m1,true);
+
+
     }
 
     public static junit.framework.Test suite()

Added: 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=auto&rev=486132
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
 Tue Dec 12 05:22:45 2006
@@ -0,0 +1,151 @@
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.client.*;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+
+public class ReturnUnroutableMandatoryMessageTest extends TestCase implements 
ExceptionListener
+{
+    private static final Logger _logger = 
Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
+
+    private final List<Message> _bouncedMessageList = 
Collections.synchronizedList(new ArrayList<Message>());
+
+    static
+    {
+        String workdir = System.getProperty("QPID_WORK");
+        if (workdir == null || workdir.equals(""))
+        {
+            String tempdir = System.getProperty("java.io.tmpdir");
+            System.out.println("QPID_WORK not set using tmp directory: " + 
tempdir);
+            System.setProperty("QPID_WORK", tempdir);
+        }
+//        DOMConfigurator.configure("../broker/etc/log4j.xml");
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+    }
+
+    /**
+     * Tests that mandatory message which are not routable are returned to the 
producer
+     *
+     * @throws Exception
+     */
+    public void testReturnUnroutableMandatoryMessage() throws Exception
+    {
+        _bouncedMessageList.clear();
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", 
"consumer1", "/test");
+
+        TestableMemoryMessageStore store = (TestableMemoryMessageStore) 
ApplicationRegistry.getInstance().getMessageStore();
+
+        AMQSession consumerSession = (AMQSession) con.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+
+        AMQHeadersExchange queue = new AMQHeadersExchange(new 
AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+
 BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+        FieldTable ft = new PropertyFieldTable();
+        ft.setString("F1000","1");
+        MessageConsumer consumer = consumerSession.createConsumer(queue, 
AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, 
false, false, (String)null, ft);
+
+        
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) 
consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, 
ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"producer1", "/test");
+
+        con2.setExceptionListener(this);
+        AMQSession producerSession = (AMQSession) con2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+        // Need to start the "producer" connection in order to receive bounced 
messages
+        _logger.info("Starting producer connection");
+        con2.start();
+
+
+        MessageProducer nonMandatoryProducer = 
producerSession.createProducer(queue,false,false);
+        MessageProducer mandatoryProducer = 
producerSession.createProducer(queue);
+
+
+        // First test - should neither be bounced nor routed
+        _logger.info("Sending non-routable non-mandatory message");
+        TextMessage msg1 =  producerSession.createTextMessage("msg1");
+        nonMandatoryProducer.send(msg1);
+
+        // Second test - should be bounced
+        _logger.info("Sending non-routable mandatory message");
+        TextMessage msg2 =  producerSession.createTextMessage("msg2");
+        mandatoryProducer.send(msg2);
+
+        // Third test - should be routed
+        _logger.info("Sending routable message");
+        TextMessage msg3 =  producerSession.createTextMessage("msg3");
+        msg3.setStringProperty("F1000","1");
+        mandatoryProducer.send(msg3);
+
+
+
+        _logger.info("Starting consumer connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+        assertTrue("No message routed to receiver",tm != null);
+        assertTrue("Wrong message routed to receiver: 
"+tm.getText(),"msg3".equals(tm.getText()));
+
+        try
+        {
+            Thread.sleep(1000L);
+        }
+        catch(InterruptedException e)
+        {
+            ;
+        }
+
+        assertTrue("Wrong number of messages bounced (expect 1): 
"+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+        Message m = _bouncedMessageList.get(0);
+        assertTrue("Wrong message bounced: 
"+m.toString(),m.toString().contains("msg2"));
+
+
+
+
+        con.close();
+        con2.close();
+
+
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new VMBrokerSetup(new 
junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+    }
+
+    public void onException(JMSException jmsException)
+    {
+        _logger.warn("Caught exception on producer: ",jmsException);
+        Exception linkedException = jmsException.getLinkedException();
+        if(linkedException instanceof AMQNoRouteException)
+        {
+            AMQNoRouteException noRoute = (AMQNoRouteException) 
linkedException;
+            Message bounced = (Message) noRoute.getUndeliveredMessage();
+            _bouncedMessageList.add(bounced);
+        }
+    }
+}


Reply via email to