Author: aidan
Date: Thu Sep 18 09:12:46 2008
New Revision: 696686

URL: http://svn.apache.org/viewvc?rev=696686&view=rev
Log:
QPID-1286:  make sure priority queues don't mess with deleted subscriptions

AMQPriorityQueue: don't advance deleted subscriptions
AMQPriorityQueueTest: Add test class for priority queues
SimpleAMQQueueTest: Add more tests
PriorityTest: Check for more message orders

Added:
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=696686&r1=696685&r2=696686&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
 Thu Sep 18 09:12:46 2008
@@ -52,8 +52,12 @@
         while(subIter.advance() && !entry.isAcquired())
         {
             final Subscription subscription = 
subIter.getNode().getSubscription();
+            if (subIter.getNode().isDeleted())
+            {
+                continue;
+            }
             QueueEntry subnode = subscription.getLastSeenEntry();
-            while((entry.compareTo(subnode) < 0) && !entry.isAcquired())
+            while(entry.compareTo(subnode) < 0 && !entry.isAcquired())
             {
                 if(subscription.setLastSeenEntry(subnode,entry))
                 {

Added: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=696686&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
 Thu Sep 18 09:12:46 2008
@@ -0,0 +1,90 @@
+package org.apache.qpid.server.queue;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.util.ArrayList;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+
+public class AMQPriorityQueueTest extends SimpleAMQQueueTest
+{
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        arguments = new FieldTable();
+        arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+        super.setUp();
+    }
+
+    public void testPriorityOrdering() throws AMQException, 
InterruptedException
+    {
+
+        // Enqueue messages in order
+        queue.enqueue(null, createMessage(1L, (byte) 10));
+        queue.enqueue(null, createMessage(2L, (byte) 4));
+        queue.enqueue(null, createMessage(3L, (byte) 0));
+        
+        // Enqueue messages in reverse order
+        queue.enqueue(null, createMessage(4L, (byte) 0));
+        queue.enqueue(null, createMessage(5L, (byte) 4));
+        queue.enqueue(null, createMessage(6L, (byte) 10));
+        
+        // Enqueue messages out of order
+        queue.enqueue(null, createMessage(7L, (byte) 4));
+        queue.enqueue(null, createMessage(8L, (byte) 10));
+        queue.enqueue(null, createMessage(9L, (byte) 0));
+        
+        // Register subscriber
+        queue.registerSubscription(subscription, false);
+        Thread.sleep(150);
+        
+        ArrayList<QueueEntry> msgs = subscription.getMessages();
+        assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
+        assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
+        assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
+
+        assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
+        assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
+        assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
+        
+        assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
+        assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
+        assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
+    }
+
+    protected AMQMessage createMessage(Long id, byte i) throws AMQException
+    {
+        AMQMessage msg = super.createMessage(id);
+        BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
+        props.setPriority(i);
+        msg.getContentHeaderBody().properties = props;
+        return msg;
+    }
+    
+    protected AMQMessage createMessage(Long id) throws AMQException
+    {
+        return createMessage(id, (byte) 0);
+    }
+    
+}

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=696686&r1=696685&r2=696686&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 Thu Sep 18 09:12:46 2008
@@ -23,29 +23,37 @@
 
 import java.util.List;
 
+import junit.framework.TestCase;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
-import junit.framework.TestCase;
 
 public class SimpleAMQQueueTest extends TestCase
 {
 
-    private SimpleAMQQueue _queue;
-    private VirtualHost _virtualHost;
-    private MessageStore store = new TestableMemoryMessageStore();
-    private TransactionalContext ctx = new NonTransactionalContext(store, new 
StoreContext(), null, null);
-    private MessageHandleFactory factory = new MessageHandleFactory();
-
+    protected SimpleAMQQueue queue;
+    protected VirtualHost virtualHost;
+    protected MessageStore store = new TestableMemoryMessageStore();
+    protected AMQShortString qname = new AMQShortString("qname");
+    protected AMQShortString owner = new AMQShortString("owner");
+    protected AMQShortString routingKey = new AMQShortString("routing key");
+    protected DirectExchange exchange = new DirectExchange();
+    protected MockSubscription subscription = new MockSubscription();
+    protected FieldTable arguments = null;
+    
     MessagePublishInfo info = new MessagePublishInfo()
     {
 
@@ -74,7 +82,7 @@
             return null;
         }
     };
-
+    
     @Override
     protected void setUp() throws Exception
     {
@@ -82,30 +90,198 @@
         //Create Application Registry for test
         ApplicationRegistry applicationRegistry = 
(ApplicationRegistry)ApplicationRegistry.getInstance(1);
 
-        AMQShortString qname = new AMQShortString("qname");
-        AMQShortString owner = new AMQShortString("owner");
-        _virtualHost = new VirtualHost("vhost", store);
-        _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, 
false, owner, false, _virtualHost, null);
-        
-        applicationRegistry 
.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
+        virtualHost = new VirtualHost("vhost", store);
+        
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(virtualHost);
+
+        queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, 
false, owner, false, virtualHost, arguments);
     }
 
     @Override
     protected void tearDown()
     {
+        queue.stop();
         ApplicationRegistry.remove(1);
     }
 
+    public void testCreateQueue() throws AMQException
+    {
+        queue.stop();
+        try {
+            queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, 
false, owner, false, virtualHost, arguments );
+            assertNull("Queue was created", queue);
+        }
+        catch (IllegalArgumentException e)
+        {
+            assertTrue("Exception was not about missing name", 
+                            e.getMessage().contains("name"));
+        }
+        
+        try {
+            queue = new SimpleAMQQueue(qname, false, owner, false, null);
+            assertNull("Queue was created", queue);
+        }
+        catch (IllegalArgumentException e)
+        {
+            assertTrue("Exception was not about missing vhost", 
+                    e.getMessage().contains("Host"));
+        }
+
+        queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, 
false, owner, false, 
+                                                                virtualHost, 
arguments);
+        assertNotNull("Queue was not created", queue);
+    }
+    
+    public void testGetVirtualHost()
+    {
+        assertEquals("Virtual host was wrong", virtualHost, 
queue.getVirtualHost());
+    }
+    
+    public void testBinding()
+    {
+        try
+        {
+            queue.bind(exchange, routingKey, null);
+            assertTrue("Routing key was not bound", 
+                            exchange.getBindings().containsKey(routingKey));
+            assertEquals("Queue was not bound to key", 
+                        exchange.getBindings().get(routingKey).get(0),
+                        queue);
+            assertEquals("Exchange binding count", 1, 
+                    queue.getExchangeBindings().size());
+            assertEquals("Wrong exchange bound", routingKey, 
+                    queue.getExchangeBindings().get(0).getRoutingKey());
+            assertEquals("Wrong exchange bound", exchange, 
+                    queue.getExchangeBindings().get(0).getExchange());
+            
+            queue.unBind(exchange, routingKey, null);
+            assertFalse("Routing key was still bound", 
+                    exchange.getBindings().containsKey(routingKey));
+            assertNull("Routing key was not empty", 
+                    exchange.getBindings().get(routingKey));
+        }
+        catch (AMQException e)
+        {
+            assertNull("Unexpected exception", e);
+        }
+    }
+    
+    public void testSubscription() throws AMQException
+    {
+        // Check adding a subscription adds it to the queue
+        queue.registerSubscription(subscription, false);
+        assertEquals("Subscription did not get queue", queue, 
+                      subscription.getQueue());
+        assertEquals("Queue does not have consumer", 1, 
+                     queue.getConsumerCount());
+        assertEquals("Queue does not have active consumer", 1, 
+                queue.getActiveConsumerCount());
+        
+        // Check sending a message ends up with the subscriber
+        AMQMessage messageA = createMessage(new Long(24));
+        queue.enqueue(null, messageA);
+        assertEquals(messageA, subscription.getLastSeenEntry().getMessage());
+        
+        // Check removing the subscription removes it's information from the 
queue
+        queue.unregisterSubscription(subscription);
+        assertTrue("Subscription still had queue", subscription.isClosed());
+        assertFalse("Queue still has consumer", 1 == queue.getConsumerCount());
+        assertFalse("Queue still has active consumer", 
+                1 == queue.getActiveConsumerCount());
+        
+        AMQMessage messageB = createMessage(new Long (25));
+        queue.enqueue(null, messageB);
+        QueueEntry entry = subscription.getLastSeenEntry();
+        assertNull(entry);
+    }
+    
+    public void testQueueNoSubscriber() throws AMQException, 
InterruptedException
+    {
+        AMQMessage messageA = createMessage(new Long(24));
+        queue.enqueue(null, messageA);
+        queue.registerSubscription(subscription, false);
+        Thread.sleep(150);
+        assertEquals(messageA, subscription.getLastSeenEntry().getMessage());
+    }
+
+    public void testExclusiveConsumer() throws AMQException
+    {
+        // Check adding an exclusive subscription adds it to the queue
+        queue.registerSubscription(subscription, true);
+        assertEquals("Subscription did not get queue", queue, 
+                subscription.getQueue());
+        assertEquals("Queue does not have consumer", 1, 
+                queue.getConsumerCount());
+        assertEquals("Queue does not have active consumer", 1, 
+                queue.getActiveConsumerCount());
+
+        // Check sending a message ends up with the subscriber
+        AMQMessage messageA = createMessage(new Long(24));
+        queue.enqueue(null, messageA);
+        assertEquals(messageA, subscription.getLastSeenEntry().getMessage());
+        
+        // Check we cannot add a second subscriber to the queue
+        Subscription subB = new MockSubscription();
+        Exception ex = null;
+        try
+        {
+            queue.registerSubscription(subB, false);
+        }
+        catch (AMQException e)
+        {
+           ex = e; 
+        }
+        assertNotNull(ex);
+        assertTrue(ex instanceof AMQException);
+
+        // Check we cannot add an exclusive subscriber to a queue with an 
+        // existing subscription
+        queue.unregisterSubscription(subscription);
+        queue.registerSubscription(subscription, false);
+        try
+        {
+            queue.registerSubscription(subB, true);
+        }
+        catch (AMQException e)
+        {
+           ex = e; 
+        }
+        assertNotNull(ex);
+    }
+    
+    public void testAutoDeleteQueue() throws Exception 
+    {
+       queue.stop();
+       queue = new SimpleAMQQueue(qname, false, owner, true, virtualHost);
+       queue.registerSubscription(subscription, false);
+       AMQMessage message = createMessage(new Long(25));
+       queue.enqueue(null, message);
+       queue.unregisterSubscription(subscription);
+       assertTrue("Queue was not deleted when subscription was removed",
+                  queue.isDeleted());
+    }
+    
+    public void testResend() throws Exception
+    {
+        queue.registerSubscription(subscription, false);
+        Long id = new Long(26);
+        AMQMessage message = createMessage(id);
+        queue.enqueue(null, message);
+        QueueEntry entry = subscription.getLastSeenEntry();
+        entry.setRedelivered(true);
+        queue.resend(entry, subscription);
+        
+    }
+    
     public void testGetFirstMessageId() throws Exception
     {
         // Create message
         Long messageId = new Long(23);
-        AMQMessage message = new TestMessage(messageId, messageId, info, new 
StoreContext());
+        AMQMessage message = createMessage(messageId);
 
         // Put message on queue
-        _queue.enqueue(null, message);
+        queue.enqueue(null, message);
         // Get message id
-        Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
+        Long testmsgid = queue.getMessagesOnTheQueue(1).get(0);
 
         // Check message id
         assertEquals("Message ID was wrong", messageId, testmsgid);
@@ -117,12 +293,12 @@
         {
             // Create message
             Long messageId = new Long(i);
-            AMQMessage message = new TestMessage(messageId, messageId, info, 
new StoreContext());
+            AMQMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(null, message);
+            queue.enqueue(null, message);
         }
         // Get message ids
-        List<Long> msgids = _queue.getMessagesOnTheQueue(5);
+        List<Long> msgids = queue.getMessagesOnTheQueue(5);
 
         // Check message id
         for (int i = 0; i < 5; i++)
@@ -138,12 +314,12 @@
         {
             // Create message
             Long messageId = new Long(i);
-            AMQMessage message = new TestMessage(messageId, messageId, info, 
new StoreContext());
+            AMQMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(null, message);
+            queue.enqueue(null, message);
         }
         // Get message ids
-        List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
+        List<Long> msgids = queue.getMessagesOnTheQueue(5, 5);
 
         // Check message id
         for (int i = 0; i < 5; i++)
@@ -210,4 +386,10 @@
             assertEquals("Wrong count for message with tag " + _tag, expected, 
_count);
         }
     }
+    
+    protected AMQMessage createMessage(Long id) throws AMQException
+    {
+        AMQMessage messageA = new TestMessage(id, id, info, new 
StoreContext());
+        return messageA;
+    }
 }

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java?rev=696686&r1=696685&r2=696686&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
 Thu Sep 18 09:12:46 2008
@@ -52,9 +52,19 @@
     protected final String VHOST = "/test";
     protected final String QUEUE = "PriorityQueue";
 
-
     private static final int MSG_COUNT = 50;
 
+    private Context context = null;
+    private Connection producerConnection;
+    private MessageProducer producer;
+    private Session producerSession;
+    private Queue queue;
+    private Connection consumerConnection;
+    private Session consumerSession;
+
+
+    private MessageConsumer consumer;
+    
     protected void setUp() throws Exception
     {
         super.setUp();
@@ -64,7 +74,21 @@
             TransportConnection.createVMBroker(1);
         }
 
+        InitialContextFactory factory = new 
PropertiesFileInitialContextFactory();
+        Hashtable<String, String> env = new Hashtable<String, String>();
+
+        env.put("connectionfactory.connection", "amqp://guest:[EMAIL 
PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'");
+        env.put("queue.queue", QUEUE);
+
+        context = factory.getInitialContext(env);
+        producerConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
+        producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
+        producerConnection.start();
+        
+        consumerConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
+        consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        
     }
 
     private boolean usingInVMBroker()
@@ -74,6 +98,8 @@
 
     protected void tearDown() throws Exception
     {
+        producerConnection.close();
+        consumerConnection.close();
         if (usingInVMBroker())
         {
             TransportConnection.killAllVMBrokers();
@@ -83,65 +109,25 @@
 
     public void testPriority() throws JMSException, NamingException, 
AMQException
     {
-        InitialContextFactory factory = new 
PropertiesFileInitialContextFactory();
-
-        Hashtable<String, String> env = new Hashtable<String, String>();
-
-        env.put("connectionfactory.connection", "amqp://guest:[EMAIL 
PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'");
-        env.put("queue.queue", QUEUE);
-
-        Context context = factory.getInitialContext(env);
-
-        Connection producerConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
-
-        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-priorities",10);
-
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), 
true, false, false, arguments);
-
-        Queue queue = new AMQQueue("amq.direct",QUEUE);
-
+        queue = new AMQQueue("amq.direct",QUEUE);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-
-
-
-
-
-
-        producerConnection.start();
-
-
-        MessageProducer producer = producerSession.createProducer(queue);
-
-
-
-
+        producer = producerSession.createProducer(queue);
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
             producer.setPriority(msg % 10);
             producer.send(nextMessage(msg, false, producerSession, producer));
         }
-
         producer.close();
         producerSession.close();
         producerConnection.close();
 
-
-        Connection consumerConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
-        Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = consumerSession.createConsumer(queue);
-
-
-
-
+        consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
-
         Message received;
-        //Receive Message 0
-        StringBuilder buf = new StringBuilder();
         int receivedCount = 0;
         Message previous = null;
         int messageCount = 0;
@@ -158,10 +144,78 @@
         }
 
         assertEquals("Incorrect number of message received", 50, 
receivedCount);
+    }
+    
+    public void testOddOrdering() throws AMQException, JMSException
+    {
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-priorities",3);
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), 
true, false, false, arguments);
+        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        producer = producerSession.createProducer(queue);
+        
+        // In order ABC
+        producer.setPriority(9);
+        producer.send(nextMessage(1, false, producerSession, producer));
+        producer.setPriority(4);
+        producer.send(nextMessage(2, false, producerSession, producer));
+        producer.setPriority(1);
+        producer.send(nextMessage(3, false, producerSession, producer));
+
+        // Out of order BAC
+        producer.setPriority(4);
+        producer.send(nextMessage(4, false, producerSession, producer));
+        producer.setPriority(9);
+        producer.send(nextMessage(5, false, producerSession, producer));
+        producer.setPriority(1);
+        producer.send(nextMessage(6, false, producerSession, producer));
+
+        // Out of order BCA 
+        producer.setPriority(4);
+        producer.send(nextMessage(7, false, producerSession, producer));
+        producer.setPriority(1);
+        producer.send(nextMessage(8, false, producerSession, producer));
+        producer.setPriority(9);
+        producer.send(nextMessage(9, false, producerSession, producer));
+        
+        // Reverse order CBA
+        producer.setPriority(1);
+        producer.send(nextMessage(10, false, producerSession, producer));
+        producer.setPriority(4);
+        producer.send(nextMessage(11, false, producerSession, producer));
+        producer.setPriority(9);
+        producer.send(nextMessage(12, false, producerSession, producer));
 
-        producerSession.close();
-        producer.close();
-
+        consumer = consumerSession.createConsumer(queue);
+        consumerConnection.start();
+        
+        Message msg = consumer.receive(500);
+        assertEquals(1, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(5, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(9, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(12, msg.getIntProperty("msg"));
+        
+        msg = consumer.receive(500);
+        assertEquals(2, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(4, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(7, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(11, msg.getIntProperty("msg"));
+        
+        msg = consumer.receive(500);
+        assertEquals(3, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(6, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(8, msg.getIntProperty("msg"));
+        msg = consumer.receive(500);
+        assertEquals(10, msg.getIntProperty("msg"));
     }
 
     private Message nextMessage(int msg, boolean first, Session 
producerSession, MessageProducer producer) throws JMSException


Reply via email to