Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
 Thu Jun 19 02:01:59 2008
@@ -23,24 +23,26 @@
 import junit.framework.TestCase;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.Collections;
 
 /**
  * Tests that acknowledgements are handled correctly.
@@ -49,7 +51,7 @@
 {
     private static final Logger _log = Logger.getLogger(AckTest.class);
 
-    private SubscriptionImpl _subscription;
+    private Subscription _subscription;
 
     private MockProtocolSession _protocolSession;
 
@@ -57,9 +59,7 @@
 
     private StoreContext _storeContext = new StoreContext();
 
-    private AMQChannel _channel;
-
-    private SubscriptionSet _subscriptionManager;
+    private AMQChannel _channel;                   
 
     private AMQQueue _queue;
 
@@ -75,11 +75,13 @@
         super.setUp();
         _messageStore = new TestableMemoryMessageStore();
         _protocolSession = new MockProtocolSession(_messageStore);
-        _channel = new AMQChannel(_protocolSession, 5, _messageStore);
+        _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont 
need exchange registry*/);
 
         _protocolSession.addChannel(_channel);
-        _subscriptionManager = new SubscriptionSet();
-        _queue = new AMQQueue(new AMQShortString("myQ"), false, new 
AMQShortString("guest"), true, 
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
 _subscriptionManager);
+
+        _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), 
false, new AMQShortString("guest"), true, 
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
+                                                    null);
+
     }
 
     private void publishMessages(int count) throws AMQException
@@ -92,6 +94,7 @@
         TransactionalContext txnContext = new 
NonTransactionalContext(_messageStore, _storeContext, null,
                                                                       new 
LinkedList<RequiredDeliveryException>()
         );
+        _queue.registerSubscription(_subscription,false);
         MessageHandleFactory factory = new MessageHandleFactory();
         for (int i = 1; i <= count; i++)
         {
@@ -125,7 +128,8 @@
                     return new AMQShortString("rk");
                 }
             };
-            AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), 
publishBody, txnContext);
+            IncomingMessage msg = new 
IncomingMessage(_messageStore.getNewMessageId(), publishBody, 
txnContext,_protocolSession);
+            //IncomingMessage msg2 = null;
             if (persistent)
             {
                 BasicContentHeaderProperties b = new 
BasicContentHeaderProperties();
@@ -142,10 +146,14 @@
             // we increment the reference here since we are not delivering the 
messaging to any queues, which is where
             // the reference is normally incremented. The test is easier to 
construct if we have direct access to the
             // subscription
-            msg.incrementReference();
-            msg.routingComplete(_messageStore, _storeContext, factory);
+            msg.enqueue(Collections.singleton(_queue));
+            msg.routingComplete(_messageStore, factory);
+            if(msg.allContentReceived())
+            {
+                msg.deliverToQueues();
+            }
             // we manually send the message to the subscription
-            _subscription.send(new QueueEntry(_queue,msg), _queue);
+            //_subscription.send(new QueueEntry(_queue,msg), _queue);
         }
     }
 
@@ -155,16 +163,13 @@
      */
     public void testAckChannelAssociationTest() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, 
DEFAULT_CONSUMER_TAG, true);
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new 
LimitlessCreditManager());
         final int msgCount = 10;
         publishMessages(msgCount, true);
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == msgCount);
         assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
-        
-        //DTX
-        //       assertTrue(_messageStore.getNumberStoredMessages() == 
msgCount);
 
         Set<Long> deliveryTagSet = map.getDeliveryTags();
         int i = 1;
@@ -172,15 +177,12 @@
         {
             assertTrue(deliveryTag == i);
             i++;
-            UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+            QueueEntry unackedMsg = map.get(deliveryTag);
             assertTrue(unackedMsg.getQueue() == _queue);
         }
 
         assertTrue(map.size() == msgCount);
         assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
-        
-        //DTX
-//        assertTrue(_messageStore.getNumberStoredMessages() == msgCount);
     }
 
     /**
@@ -189,15 +191,32 @@
     public void testNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscription = new SubscriptionImpl(5, _protocolSession, 
DEFAULT_CONSUMER_TAG, false);
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new 
LimitlessCreditManager());
         final int msgCount = 10;
         publishMessages(msgCount);
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
         assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
-        //DTX MessageStore
-//        assertTrue(_messageStore.getNumberStoredMessages() == 0);
+        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+
+    }
+
+    /**
+     * Tests that in no-ack mode no messages are retained
+     */
+    public void testPersistentNoAckMode() throws AMQException
+    {
+        // false arg means no acks expected
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new 
LimitlessCreditManager());
+        final int msgCount = 10;
+        publishMessages(msgCount, true);
+
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 0);
+        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
+        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+
     }
 
     /**
@@ -206,7 +225,7 @@
      */
     public void testSingleAckReceivedTest() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, 
DEFAULT_CONSUMER_TAG, true);
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new 
LimitlessCreditManager());
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -219,7 +238,7 @@
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i);
-            UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+            QueueEntry unackedMsg = map.get(deliveryTag);
             assertTrue(unackedMsg.getQueue() == _queue);
             // 5 is the delivery tag of the message that *should* be removed
             if (++i == 5)
@@ -235,7 +254,7 @@
      */
     public void testMultiAckReceivedTest() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, 
DEFAULT_CONSUMER_TAG, true);
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new 
LimitlessCreditManager());
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -248,7 +267,7 @@
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i + 5);
-            UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+            QueueEntry unackedMsg = map.get(deliveryTag);
             assertTrue(unackedMsg.getQueue() == _queue);
             ++i;
         }
@@ -259,7 +278,7 @@
      */
     public void testMultiAckAllReceivedTest() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, 
DEFAULT_CONSUMER_TAG, true);
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new 
LimitlessCreditManager());
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -272,18 +291,19 @@
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i + 5);
-            UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+            QueueEntry unackedMsg = map.get(deliveryTag);
             assertTrue(unackedMsg.getQueue() == _queue);
             ++i;
         }
     }
 
+/*
     public void testPrefetchHighLow() throws AMQException
     {
         int lowMark = 5;
         int highMark = 10;
 
-        _subscription = new SubscriptionImpl(5, _protocolSession, 
DEFAULT_CONSUMER_TAG, true);
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new 
LimitlessCreditManager());
         _channel.setPrefetchLowMarkCount(lowMark);
         _channel.setPrefetchHighMarkCount(highMark);
 
@@ -332,10 +352,12 @@
         assertTrue(map.size() == 0);
     }
 
+*/
+/*
     public void testPrefetch() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, 
DEFAULT_CONSUMER_TAG, true);
-        _channel.setPrefetchCount(5);
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, 
_protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new 
LimitlessCreditManager());
+        _channel.setMessageCredit(5);
 
         assertTrue(_channel.getPrefetchCount() == 5);
 
@@ -360,6 +382,7 @@
         assertTrue(map.size() == 0);
     }
 
+*/
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(AckTest.class);

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
 Thu Jun 19 02:01:59 2008
@@ -24,7 +24,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.util.NullApplicationRegistry;
@@ -39,7 +39,7 @@
 
 class MessageTestHelper extends TestCase
 {
-    private final MessageStore _messageStore = new MemoryMessageStore();
+    private final MessageStore _messageStore = new SkeletonMessageStore();
 
     private final StoreContext _storeContext = new StoreContext();
 
@@ -52,12 +52,12 @@
         ApplicationRegistry.initialise(new NullApplicationRegistry());
     }
 
-    QueueEntry message() throws AMQException
+    QueueEntryImpl message() throws AMQException
     {
         return message(false);
     }
 
-    QueueEntry message(final boolean immediate) throws AMQException
+    QueueEntryImpl message(final boolean immediate) throws AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()
         {
@@ -87,9 +87,16 @@
                 return null;
             }
         };
-                              
-        return new QueueEntry(null,new 
AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
-                              new ContentHeaderBody()));
+
+        //public AMQMessage(Long messageId, AMQMessageHandle messageHandle , 
TransactionalContext txnConext, MessagePublishInfo info)
+        long messageId = _messageStore.getNewMessageId();
+        final AMQMessageHandle messageHandle =
+                (new MessageHandleFactory()).createMessageHandle(messageId, 
_messageStore, false);
+        messageHandle.setPublishAndContentHeaderBody(new 
StoreContext(),publish,new ContentHeaderBody());
+        AMQMessage msg = new AMQMessage(messageHandle, 
_txnContext.getStoreContext(), publish);
+        
+
+        return new QueueEntryImpl(null,msg, Long.MIN_VALUE);
     }
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
 Thu Jun 19 02:01:59 2008
@@ -215,6 +215,11 @@
         return null;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
 
+    public ProtocolSessionIdentifier getSessionIdentifier()
+    {
+        return null;
+    }
+
     public byte getProtocolMajorVersion()
     {
         return getProtocolVersion().getMajorVersion();

Added: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java?rev=669431&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
 Thu Jun 19 02:01:59 2008
@@ -0,0 +1,176 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion 
behaviour */
+public class PriorityTest extends TestCase
+{
+    private static final Logger _logger = Logger.getLogger(PriorityTest.class);
+
+
+    protected final String BROKER = "vm://:1";
+    protected final String VHOST = "/test";
+    protected final String QUEUE = "PriorityQueue";
+
+
+    private static final int MSG_COUNT = 50;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+        if (usingInVMBroker())
+        {
+            TransportConnection.createVMBroker(1);
+        }
+
+
+    }
+
+    private boolean usingInVMBroker()
+    {
+        return BROKER.startsWith("vm://");
+    }
+
+    protected void tearDown() throws Exception
+    {
+        if (usingInVMBroker())
+        {
+            TransportConnection.killAllVMBrokers();
+        }
+        super.tearDown();
+    }
+
+    public void testPriority() throws JMSException, NamingException, 
AMQException
+    {
+        InitialContextFactory factory = new 
PropertiesFileInitialContextFactory();
+
+        Hashtable<String, String> env = new Hashtable<String, String>();
+
+        env.put("connectionfactory.connection", "amqp://guest:[EMAIL 
PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'");
+        env.put("queue.queue", QUEUE);
+
+        Context context = factory.getInitialContext(env);
+
+        Connection producerConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
+
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-priorities",10);
+
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), 
true, false, false, arguments);
+
+        Queue queue = new AMQQueue("amq.direct",QUEUE);
+
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+
+
+
+
+
+
+        producerConnection.start();
+
+
+        MessageProducer producer = producerSession.createProducer(queue);
+
+
+
+
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            producer.setPriority(msg % 10);
+            producer.send(nextMessage(msg, false, producerSession, producer));
+        }
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+
+        Connection consumerConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
+        Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+
+
+
+        consumerConnection.start();
+
+        Message received;
+        //Receive Message 0
+        StringBuilder buf = new StringBuilder();
+        int receivedCount = 0;
+        Message previous = null;
+        int messageCount = 0;
+        while((received = consumer.receive(1000))!=null)
+        {   
+            messageCount++;
+            if(previous != null)
+            {
+                assertTrue("Messages arrived in unexpected order " + 
messageCount + " " + previous.getIntProperty("msg") + " " + 
received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + 
received.getJMSPriority(), (previous.getJMSPriority() > 
received.getJMSPriority()) || ((previous.getJMSPriority() == 
received.getJMSPriority()) && previous.getIntProperty("msg") < 
received.getIntProperty("msg")) );
+            }
+
+            previous = received;
+            receivedCount++;
+        }
+
+        assertEquals("Incorrect number of message received", 50, 
receivedCount);
+
+        producerSession.close();
+        producer.close();
+
+    }
+
+    private Message nextMessage(int msg, boolean first, Session 
producerSession, MessageProducer producer) throws JMSException
+    {
+        Message send = producerSession.createTextMessage("Message: " + msg);
+        send.setIntProperty("msg", msg);
+
+        return send;
+    }
+
+
+}

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
 Thu Jun 19 02:01:59 2008
@@ -81,14 +81,6 @@
         System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() + ":" 
+ _logger.isEnabledFor(Level.DEBUG));
         System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() + ":" 
+ _logger.isEnabledFor(Level.TRACE));
 
-        Logger csdm = 
Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
-        System.err.println("csdm.isE-Error:" + csdm.isEnabledFor(Level.ERROR));
-        System.err.println("csdm.isE-Warn:" + csdm.isEnabledFor(Level.WARN));
-        System.err.println("csdm.isInfo:" + csdm.isInfoEnabled() + ":" + 
csdm.isEnabledFor(Level.INFO));
-        System.err.println("csdm.isDebug:" + csdm.isDebugEnabled() + ":" + 
csdm.isEnabledFor(Level.DEBUG));
-        System.err.println("csdm.isTrace:" + csdm.isTraceEnabled() + ":" + 
csdm.isEnabledFor(Level.TRACE));
-
-
         System.err.println(Logger.getRootLogger().getLoggerRepository());
 
         if (BROKER.startsWith("vm://"))
@@ -184,9 +176,14 @@
 
         try
         {
+            Thread.sleep(2000);
             long queueDepth = ((AMQSession) 
_clientSession).getQueueDepth((AMQDestination) _context.lookup("queue"));
             assertEquals("Session reports Queue depth not as expected", 0, 
queueDepth);
         }
+        catch (InterruptedException e)
+        {
+            fail(e.getMessage());
+        }
         catch (NamingException e)
         {
             fail(e.getMessage());
@@ -209,7 +206,7 @@
 
     }
 
-    private void verifyAllMessagesRecevied() throws JMSException
+    private void verifyAllMessagesRecevied() throws Exception
     {
 
         boolean[] msgIdRecevied = new boolean[MSG_COUNT];
@@ -219,6 +216,8 @@
             _messages[i] = _consumer.receive(1000);
             assertNotNull("should have received a message but didn't", 
_messages[i]);
         }
+        long queueDepth = ((AMQSession) 
_clientSession).getQueueDepth((AMQDestination) _context.lookup("queue"));
+        assertEquals("Session reports Queue depth not as expected", 0, 
queueDepth);
 
         //Check received messages
         int msgId = 0;

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 Thu Jun 19 02:01:59 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.framing.AMQShortString;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -54,7 +56,12 @@
         return messages;
     }
 
-    public void send(QueueEntry msg, AMQQueue queue)
+    public void setQueue(AMQQueue queue)
+    {
+        
+    }
+
+    public void send(QueueEntry msg)
     {
         messages.add(msg);
     }
@@ -79,9 +86,39 @@
         //no-op
     }
 
-    public Object getSendLock()
+    public void getSendLock()
+    {
+        return;
+    }
+
+    public void releaseSendLock()
+    {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    public void resend(final QueueEntry entry)
+    {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    public void restoreCredit(final QueueEntry queueEntry)
+    {
+
+    }
+
+    public void setStateListener(final StateListener listener)
+    {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    public QueueEntry getLastSeenEntry()
+    {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
     {
-        return new Object();
+        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
 
     public AMQChannel getChannel()
@@ -94,6 +131,26 @@
         //no-op
     }
 
+    public AMQShortString getConsumerTag()
+    {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public boolean isActive()
+    {
+        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public AMQQueue getQueue()
+    {
+        return null;
+    }
+
+    public QueueEntry.SubscriptionAcquiredState getOwningState()
+    {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
     public void queueDeleted(AMQQueue queue)
     {
     }
@@ -108,6 +165,11 @@
         return true;
     }
 
+    public boolean isAutoClose()
+    {
+        return false;
+    }
+
     public Queue<QueueEntry> getPreDeliveryQueue()
     {
         return null;
@@ -157,5 +219,4 @@
     {
         return key.toString();
     }
-    
 }

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
 Thu Jun 19 02:01:59 2008
@@ -22,12 +22,11 @@
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.exchange.Exchange;
 
@@ -130,17 +129,17 @@
         return null;
     }
 
-    public void removeQueue(AMQShortString name) throws AMQException
+    public void removeQueue(final AMQQueue queue) throws AMQException
     {
 
     }
 
-    public void enqueueMessage(StoreContext context, AMQShortString name, Long 
messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, final AMQQueue queue, 
Long messageId) throws AMQException
     {
 
     }
 
-    public void dequeueMessage(StoreContext context, AMQShortString name, Long 
messageId) throws AMQException
+    public void dequeueMessage(StoreContext context, final AMQQueue queue, 
Long messageId) throws AMQException
     {
 
     }

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
 Thu Jun 19 02:01:59 2008
@@ -28,6 +28,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.AMQMessageHandle;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 
 /**
@@ -39,6 +40,7 @@
 
     private StoreContext _storeContext = new StoreContext();
 
+
     protected void setUp() throws Exception
     {
         super.setUp();
@@ -50,7 +52,7 @@
      */
     public void testMessageGetsRemoved() throws AMQException
     {
-        createPersistentContentHeader();
+        ContentHeaderBody chb = createPersistentContentHeader();
 
         MessagePublishInfo info = new MessagePublishInfo()
         {
@@ -81,16 +83,22 @@
             }
         };
 
-        AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
-                                            new 
NonTransactionalContext(_store, _storeContext, null, null),
-                                            createPersistentContentHeader());
+
+        final long messageId = _store.getNewMessageId();
+        AMQMessageHandle messageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId, _store, true);
+        messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
+        AMQMessage message = new AMQMessage(messageHandle,
+                                             _storeContext,info);
+
         message = message.takeReference();
 
         // we call routing complete to set up the handle
-        message.routingComplete(_store, _storeContext, new 
MessageHandleFactory());
-        assertTrue(_store.getMessageMetaDataMap().size() == 1);
+ //       message.routingComplete(_store, _storeContext, new 
MessageHandleFactory());
+
+
+        assertEquals(1, _store.getMessageMetaDataMap().size());
         message.decrementReference(_storeContext);
-        assertTrue(_store.getMessageMetaDataMap().size() == 0);
+        assertEquals(1, _store.getMessageMetaDataMap().size());
     }
 
     private ContentHeaderBody createPersistentContentHeader()
@@ -134,18 +142,25 @@
             }
         };
 
-        AMQMessage message = new AMQMessage(_store.getNewMessageId(),
-                                            info,
-                                            new 
NonTransactionalContext(_store, _storeContext, null, null),
-                                            createPersistentContentHeader());
+        final Long messageId = _store.getNewMessageId();
+        final ContentHeaderBody chb = createPersistentContentHeader();
+        AMQMessageHandle messageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId, _store, true);
+        messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
+        AMQMessage message = new AMQMessage(messageHandle,
+                                             _storeContext,
+                                            info);
+        
         
         message = message.takeReference();
         // we call routing complete to set up the handle
-        message.routingComplete(_store, _storeContext, new 
MessageHandleFactory());
-        assertTrue(_store.getMessageMetaDataMap().size() == 1);
+     //   message.routingComplete(_store, _storeContext, new 
MessageHandleFactory());
+
+
+
+        assertEquals(1, _store.getMessageMetaDataMap().size());
         message = message.takeReference();
         message.decrementReference(_storeContext);
-        assertTrue(_store.getMessageMetaDataMap().size() == 1);
+        assertEquals(1, _store.getMessageMetaDataMap().size());
     }
 
     public static junit.framework.Test suite()

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
 Thu Jun 19 02:01:59 2008
@@ -110,13 +110,10 @@
                         {
                             try
                             {
-                                long remainingMessages = ((AMQSession) 
clientSession).getQueueDepth((AMQDestination) _queue);
-                                fail("The queue should have 0 msgs left, seen 
" + _msgCount + " messages, left: "
-                                        + remainingMessages);
-                            }
-                            catch (AMQException e)
-                            {
-                                fail("Got AMQException" + e.getMessage());
+                                if(_msgCount != MSG_COUNT)
+                                {
+                                    assertEquals("Wrong number of messages 
seen.", MSG_COUNT, _msgCount);
+                                }
                             }
                             finally
                             {
@@ -124,7 +121,6 @@
                                 _awaitCompletion.countDown();
                             }
                         }
-
                     }
                     catch (JMSException e)
                     {
@@ -147,6 +143,11 @@
             fail("Unable to wait for test completion");
             throw e;
         }
+
+
+        // wait for the ack to get back
+        Thread.sleep(1000);
+
         assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) 
clientSession).getQueueDepth((AMQDestination) _queue));
     }
 

Added: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java?rev=669431&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
 Thu Jun 19 02:01:59 2008
@@ -0,0 +1,210 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.VMTestCase;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import java.util.Enumeration;
+
+public class FlowControlTest extends VMTestCase
+{
+    private static final Logger _logger = 
Logger.getLogger(FlowControlTest.class);
+
+    private Connection _clientConnection;
+    private Session _clientSession;
+    private Queue _queue;
+
+    public void setUp() throws Exception
+    {
+
+        super.setUp();
+
+
+    }
+
+    /**
+     * Simply
+     */
+    public void testBasicBytesFlowControl() throws JMSException, 
NamingException, AMQException
+    {
+         _queue = new AMQQueue("amq.direct","testqueue");//(Queue) 
_context.lookup("queue");
+
+        //Create Client
+        _clientConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+
+        _clientConnection.start();
+
+        _clientSession = _clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        //Ensure _queue is created
+        _clientSession.createConsumer(_queue).close();
+
+        Connection producerConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+
+        producerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        BytesMessage m1 = producerSession.createBytesMessage();
+        m1.writeBytes(new byte[128]);
+        m1.setIntProperty("msg",1);
+        producer.send(m1);
+        BytesMessage m2 = producerSession.createBytesMessage();
+        m2.writeBytes(new byte[128]);
+        m2.setIntProperty("msg",2);
+        producer.send(m2);
+        BytesMessage m3 = producerSession.createBytesMessage();
+        m3.writeBytes(new byte[256]);
+        m3.setIntProperty("msg",3);
+        producer.send(m3);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+
+        Connection consumerConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+        Session consumerSession = consumerConnection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        ((AMQSession_0_8)consumerSession).setPrefecthLimits(0,256);
+        MessageConsumer recv = consumerSession.createConsumer(_queue);
+        consumerConnection.start();
+
+        Message r1 = recv.receive(RECEIVE_TIMEOUT);
+        assertNotNull("First message not received", r1);
+        assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
+
+        Message r2 = recv.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Second message not received", r2);
+        assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
+
+        Message r3 = recv.receiveNoWait();
+        assertNull("Third message incorrectly delivered", r3);
+
+        r1.acknowledge();
+
+        r3 = recv.receiveNoWait();
+        assertNull("Third message incorrectly delivered", r3);
+
+        r2.acknowledge();
+
+
+        r3 = recv.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Third message not received", r3);
+        assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
+
+        r3.acknowledge();
+        recv.close();
+        consumerSession.close();
+        consumerConnection.close();
+
+    }
+
+    public void testTwoConsumersBytesFlowControl() throws JMSException, 
NamingException, AMQException
+    {
+         _queue = new AMQQueue("amq.direct","testqueue1");//(Queue) 
_context.lookup("queue");
+
+        //Create Client
+        _clientConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+
+        _clientConnection.start();
+
+        _clientSession = _clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        //Ensure _queue is created
+        _clientSession.createConsumer(_queue).close();
+
+        Connection producerConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+
+        producerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        BytesMessage m1 = producerSession.createBytesMessage();
+        m1.writeBytes(new byte[128]);
+        m1.setIntProperty("msg",1);
+        producer.send(m1);
+        BytesMessage m2 = producerSession.createBytesMessage();
+        m2.writeBytes(new byte[256]);
+        m2.setIntProperty("msg",2);
+        producer.send(m2);
+        BytesMessage m3 = producerSession.createBytesMessage();
+        m3.writeBytes(new byte[128]);
+        m3.setIntProperty("msg",3);
+        producer.send(m3);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+
+        Connection consumerConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+        Session consumerSession1 = consumerConnection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        ((AMQSession_0_8)consumerSession1).setPrefecthLimits(0,256);
+        MessageConsumer recv1 = consumerSession1.createConsumer(_queue);
+
+        consumerConnection.start();
+
+        Message r1 = recv1.receive(RECEIVE_TIMEOUT);
+        assertNotNull("First message not received", r1);
+        assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
+
+
+        Message r2 = recv1.receiveNoWait();
+        assertNull("Second message incorrectly delivered", r2);
+        
+        Session consumerSession2 = consumerConnection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        ((AMQSession_0_8)consumerSession2).setPrefecthLimits(0,256);
+        MessageConsumer recv2 = consumerSession2.createConsumer(_queue);
+
+
+        r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT);
+        assertNotNull("Second message not received", r2);
+        assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
+
+        Message r3 = recv2.receiveNoWait();
+        assertNull("Third message incorrectly delivered", r3);
+
+        r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT);
+        assertNotNull("Third message not received", r3);
+        assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
+
+
+
+        r2.acknowledge();
+        r3.acknowledge();
+        recv1.close();
+        recv2.close();
+        consumerSession1.close();
+        consumerSession2.close();
+        consumerConnection.close();
+
+    }
+
+}


Reply via email to