Author: ritchiem
Date: Fri Jun 13 07:56:45 2008
New Revision: 667561

URL: http://svn.apache.org/viewvc?rev=667561&view=rev
Log:
QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. 
Added a new InternalBrokerBaseCase for performing testing on the broker without 
using the client libraries. This allows for testing closer to AMQP.

Further investigation is required to identify why the .Net was causing the 
refcounting problems that required the previous change to Unacknowledged 
message introducing this .

Added:
    
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
   (with props)
    
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
   (with props)
    
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
   (with props)
    
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
   (with props)
Removed:
    
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
    
incubator/qpid/branches/M2.x/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
Modified:
    
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java

Modified: 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 (original)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 Fri Jun 13 07:56:45 2008
@@ -26,6 +26,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.store.StoreContext;
 
 public interface UnacknowledgedMessageMap
 {
@@ -55,7 +56,7 @@
 
     UnacknowledgedMessage remove(long deliveryTag);
 
-    void drainTo(Collection<UnacknowledgedMessage> destination, long 
deliveryTag) throws AMQException;
+    public void drainTo(long deliveryTag, StoreContext storeContext) throws 
AMQException;
 
     Collection<UnacknowledgedMessage> cancelAllMessages();
 

Modified: 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 (original)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 Fri Jun 13 07:56:45 2008
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.ack;
 
+import org.apache.qpid.server.store.StoreContext;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -160,7 +161,7 @@
         }
     }
 
-    public void drainTo(Collection<UnacknowledgedMessage> destination, long 
deliveryTag) throws AMQException
+    public void drainTo(long deliveryTag, StoreContext storeContext) throws 
AMQException
     {
         synchronized (_lock)
         {
@@ -176,10 +177,12 @@
                                            " When deliveryTag is:" + 
deliveryTag + "ES:" + _map.entrySet().toString());
                 }
 
+                //Message has been ack so discard it. This will dequeue and 
decrement the reference.
+                unacked.getValue().discard(storeContext);
+
                 it.remove();
                 _unackedSize -= unacked.getValue().getMessage().getSize();
 
-                destination.add(unacked.getValue());
                 if (unacked.getKey() == deliveryTag)
                 {
                     break;

Modified: 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 Fri Jun 13 07:56:45 2008
@@ -159,28 +159,13 @@
                     throw new AMQException("Multiple ack on delivery tag " + 
deliveryTag + " not known for channel");
                 }
 
-                LinkedList<UnacknowledgedMessage> acked = new 
LinkedList<UnacknowledgedMessage>();
-                unacknowledgedMessageMap.drainTo(acked, deliveryTag);
-                for (UnacknowledgedMessage msg : acked)
-                {
-                    if (debug)
-                    {
-                        _log.debug("Discarding message: " + 
msg.getMessage().getMessageId());
-                    }
-                    if(msg.getMessage().isPersistent())
-                    {
-                        beginTranIfNecessary();
-                    }
-
-                    //Message has been ack so discard it. This will dequeue 
and decrement the reference.
-                    msg.discard(_storeContext);
-                }
+                unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
             }
         }
         else
         {
             UnacknowledgedMessage msg;
-            msg = unacknowledgedMessageMap.remove(deliveryTag);
+            msg = unacknowledgedMessageMap.get(deliveryTag);
 
             if (msg == null)
             {
@@ -202,6 +187,8 @@
             //Message has been ack so discard it. This will dequeue and 
decrement the reference.
             msg.discard(_storeContext);
 
+            unacknowledgedMessageMap.remove(deliveryTag);
+
             if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with 
delivery tag " + deliveryTag + " msg id " +

Added: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java?rev=667561&view=auto
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
 (added)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
 Fri Jun 13 07:56:45 2008
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+
+import java.util.List;
+
+public class AcknowledgeTest extends InternalBrokerBaseCase
+{
+
+    public void testTransactionalSingleAck() throws AMQException
+    {
+        _channel.setLocalTransactional();
+        runMessageAck(1, 1, 1, false, 0);
+    }
+
+    public void testTransactionalMultiAck() throws AMQException
+    {
+        _channel.setLocalTransactional();
+        runMessageAck(10, 1, 5, true, 5);
+    }
+
+    public void testTransactionalAckAll() throws AMQException
+    {
+        _channel.setLocalTransactional();
+        runMessageAck(10, 1, 0, true, 0);
+    }
+
+    public void testNonTransactionalSingleAck() throws AMQException
+    {
+        runMessageAck(1, 1, 1, false, 0);
+    }
+
+    public void testNonTransactionalMultiAck() throws AMQException
+    {
+        runMessageAck(10, 1, 5, true, 5);
+    }
+
+    public void testNonTransactionalAckAll() throws AMQException
+    {
+        runMessageAck(10, 1, 0, true, 0);
+    }
+
+    protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, 
long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int 
remainingUnackedMessages) throws AMQException
+    {
+        //Check store is empty
+        checkStoreContents(0);
+
+        //Send required messsages to the queue
+        publishMessages(_session, _channel, sendMessageCount);
+
+        if (_channel.isTransactional())
+        {
+            _channel.commit();
+        }
+
+        //Ensure they are stored
+        checkStoreContents(sendMessageCount);
+
+        //Check that there are no unacked messages
+        assertEquals("Channel should have no unacked msgs ", 0, 
_channel.getUnacknowledgedMessageMap().size());
+
+        //Subscribe to the queue
+        AMQShortString subscriber = subscribe(_session, _channel, _queue);
+
+        _queue.deliverAsync();
+
+        //Wait for the messages to be delivered
+        _session.awaitDelivery(sendMessageCount);
+
+        //Check that they are all waiting to be acknoledged
+        assertEquals("Channel should have unacked msgs", sendMessageCount, 
_channel.getUnacknowledgedMessageMap().size());
+
+        List<InternalTestProtocolSession.DeliveryPair> messages = 
_session.getDelivers(_channel.getChannelId(), subscriber, sendMessageCount);
+
+        //Double check we received the right number of messages
+        assertEquals(sendMessageCount, messages.size());
+
+        //Check that the first message has the expected deliveryTag
+        assertEquals("First message does not have expected deliveryTag", 
firstDeliveryTag, messages.get(0).getDeliveryTag());
+
+        //Send required Acknowledgement
+        _channel.acknowledgeMessage(acknowledgeDeliveryTag, 
acknowldegeMultiple);
+
+        if (_channel.isTransactional())
+        {
+            _channel.commit();
+        }
+
+        // Check Remaining Acknowledgements
+        assertEquals("Channel unacked msgs count incorrect", 
remainingUnackedMessages, _channel.getUnacknowledgedMessageMap().size());
+
+        //Check store contents are also correct.
+        checkStoreContents(remainingUnackedMessages);
+    }
+
+}

Propchange: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=667561&view=auto
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
 (added)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
 Fri Jun 13 07:56:45 2008
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InternalTestProtocolSession extends AMQMinaProtocolSession 
implements ProtocolOutputConverter
+{
+    // ChannelID(LIST)  -> LinkedList<Pair>
+    final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> 
_channelDelivers;
+    private AtomicInteger _deliveryCount = new AtomicInteger(0);
+
+    public InternalTestProtocolSession() throws AMQException
+    {
+        super(new TestIoSession(),
+              ApplicationRegistry.getInstance().getVirtualHostRegistry(),
+              new AMQCodecFactory(true));
+
+        _channelDelivers = new HashMap<Integer, Map<AMQShortString, 
LinkedList<DeliveryPair>>>();
+
+    }
+
+    public ProtocolOutputConverter getProtocolOutputConverter()
+    {
+        return this;
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return (byte) 8;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return (byte) 0;
+    }
+
+    // ***
+
+    public List<DeliveryPair> getDelivers(int channelId, AMQShortString 
consumerTag, int count)
+    {
+        synchronized (_channelDelivers)
+        {
+            List<DeliveryPair> msgs = 
_channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+
+            List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
+
+            //Remove the msgs from the receivedList.
+            msgs.clear();
+
+            return response;
+        }
+    }
+
+    // *** ProtocolOutputConverter Implementation
+    public void writeReturn(AMQMessage message, int channelId, int replyCode, 
AMQShortString replyText) throws AMQException
+    {
+    }
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString 
consumerTag)
+    {
+    }
+
+    public void writeDeliver(AMQMessage message, int channelId, long 
deliveryTag, AMQShortString consumerTag) throws AMQException
+    {
+        _deliveryCount.incrementAndGet();
+
+        synchronized (_channelDelivers)
+        {
+            Map<AMQShortString, LinkedList<DeliveryPair>> consumers = 
_channelDelivers.get(channelId);
+            
+            if (consumers == null)
+            {
+                consumers = new HashMap<AMQShortString, 
LinkedList<DeliveryPair>>();
+                _channelDelivers.put(channelId, consumers);
+            }
+
+            LinkedList<DeliveryPair> consumerDelivers = 
consumers.get(consumerTag);
+
+            if (consumerDelivers == null)
+            {
+                consumerDelivers = new LinkedList<DeliveryPair>();
+                consumers.put(consumerTag, consumerDelivers);
+            }
+
+            consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+        }
+    }
+
+    public void writeGetOk(AMQMessage message, int channelId, long 
deliveryTag, int queueSize) throws AMQException
+    {
+    }
+
+    public void awaitDelivery(int msgs)
+    {
+        int start = _deliveryCount.get();
+
+        while ((start + msgs) > _deliveryCount.get())
+        {
+            try
+            {
+                Thread.sleep(100);
+            }
+            catch (InterruptedException e)
+            {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public class DeliveryPair
+    {
+        private long _deliveryTag;
+        private AMQMessage _message;
+
+        public DeliveryPair(long deliveryTag, AMQMessage message)
+        {
+            _deliveryTag = deliveryTag;
+            _message = message;
+        }
+
+        public AMQMessage getMessage()
+        {
+            return _message;
+        }
+
+        public long getDeliveryTag()
+        {
+            return _deliveryTag;
+        }
+    }
+}

Propchange: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (original)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Fri Jun 13 07:56:45 2008
@@ -32,7 +32,7 @@
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
@@ -171,7 +171,7 @@
     */
     public void testQueueDepthAlertWithSubscribers() throws Exception
     {
-        protocolSession = new TestMinaProtocolSession();
+        protocolSession = new InternalTestProtocolSession();
         AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
         protocolSession.addChannel(channel);
 

Modified: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Fri Jun 13 07:56:45 2008
@@ -29,7 +29,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -121,7 +121,7 @@
         assertTrue(_queueMBean.getActiveConsumerCount() == 0);
 
 
-        TestMinaProtocolSession protocolSession = new 
TestMinaProtocolSession();
+        InternalTestProtocolSession protocolSession = new 
InternalTestProtocolSession();
         AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
         protocolSession.addChannel(channel);
 
@@ -277,7 +277,7 @@
         _queue = new AMQQueue(new AMQShortString("testQueue"), false, new 
AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
         _queueMBean = new AMQQueueMBean(_queue);
 
-        _protocolSession = new TestMinaProtocolSession();
+        _protocolSession = new InternalTestProtocolSession();
     }
 
     private void sendMessages(int messageCount, boolean persistent) throws 
AMQException

Added: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=667561&view=auto
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
 (added)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
 Fri Jun 13 07:56:45 2008
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+public class InternalBrokerBaseCase extends TestCase
+{
+    protected IApplicationRegistry _registry;
+    protected MessageStore _messageStore;
+    protected AMQChannel _channel;
+    protected InternalTestProtocolSession _session;
+    protected VirtualHost _virtualHost;
+    protected StoreContext _storeContext = new StoreContext();
+    protected AMQQueue _queue;
+    protected AMQShortString QUEUE_NAME;
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _registry = new TestApplicationRegistry();
+        ApplicationRegistry.initialise(_registry);
+        _virtualHost = 
_registry.getVirtualHostRegistry().getVirtualHost("test");
+        _messageStore = _virtualHost.getMessageStore();
+
+        QUEUE_NAME = new AMQShortString("test");
+        _queue = new AMQQueue(QUEUE_NAME, false, new 
AMQShortString("testowner"), false, _virtualHost);
+
+        _virtualHost.getQueueRegistry().registerQueue(_queue);
+
+        Exchange defaultExchange = 
_virtualHost.getExchangeRegistry().getDefaultExchange();
+
+        _queue.bind(QUEUE_NAME, null, defaultExchange);
+
+        _session = new InternalTestProtocolSession();
+
+        _channel = new AMQChannel(_session, 1, _messageStore);
+
+        _session.addChannel(_channel);
+    }
+
+    public void tearDown() throws Exception
+    {
+        ApplicationRegistry.removeAll();
+        super.tearDown();
+    }
+
+    protected void checkStoreContents(int messageCount)
+    {
+        assertEquals("Message header count incorrect in the MetaDataMap", 
messageCount, ((TestableMemoryMessageStore) 
_messageStore).getMessageMetaDataMap().size());
+
+        //The above publish message is sufficiently small not to fit in the 
header so no Body is required.
+        //assertEquals("Message body count incorrect in the ContentBodyMap", 
messageCount, ((TestableMemoryMessageStore) 
_messageStore).getContentBodyMap().size());
+    }
+
+    protected AMQShortString subscribe(InternalTestProtocolSession session, 
AMQChannel channel, AMQQueue queue)
+    {
+        try
+        {
+            return channel.subscribeToQueue(null, queue, session, true, null, 
false, true);
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+        catch (ConsumerTagNotUniqueException e)
+        {
+            fail(e.getMessage());
+        }
+        //Keep the compiler happy
+        return null;
+    }
+
+    public void publishMessages(InternalTestProtocolSession session, 
AMQChannel channel, int messages) throws AMQException
+    {
+        MessagePublishInfo info = new MessagePublishInfo()
+        {
+            public AMQShortString getExchange()
+            {
+                return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+            }
+
+            public void setExchange(AMQShortString exchange)
+            {
+
+            }
+
+            public boolean isImmediate()
+            {
+                return false;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return new AMQShortString("test");
+            }
+        };
+
+        for (int count = 0; count < messages; count++)
+        {
+            channel.setPublishFrame(info, session, 
_virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
+
+            //Set the body size
+            ContentHeaderBody _headerBody = new ContentHeaderBody();
+            _headerBody.bodySize = 0;
+
+            //Set Minimum properties
+            BasicContentHeaderProperties properties = new 
BasicContentHeaderProperties();
+
+            properties.setExpiration(0L);
+            properties.setTimestamp(System.currentTimeMillis());
+            //Make Message Persistent
+            properties.setDeliveryMode((byte) 2);
+
+            _headerBody.properties = properties;
+
+            channel.publishContentHeader(_headerBody, session);
+        }
+
+    }
+}

Propchange: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=667561&view=auto
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
 (added)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
 Fri Jun 13 07:56:45 2008
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import 
org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import 
org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Properties;
+
+public class TestApplicationRegistry extends ApplicationRegistry
+{
+    private QueueRegistry _queueRegistry;
+
+    private ExchangeRegistry _exchangeRegistry;
+
+    private ExchangeFactory _exchangeFactory;
+
+    private ManagedObjectRegistry _managedObjectRegistry;
+
+    private ACLPlugin _accessManager;
+
+    private PrincipalDatabaseManager _databaseManager;
+
+    private AuthenticationManager _authenticationManager;
+
+    private MessageStore _messageStore;
+    private VirtualHost _vHost;
+
+    private VirtualHostRegistry _virtualHostRegistry;
+
+    public TestApplicationRegistry()
+    {
+        super(new MapConfiguration(new HashMap()));
+    }
+
+    public void initialise() throws Exception
+    {
+        Properties users = new Properties();
+
+        users.put("guest", "guest");
+
+        _databaseManager = new PropertiesPrincipalDatabaseManager("default", 
users);
+
+        _accessManager = new AllowAll();
+
+        _authenticationManager = new 
PrincipalDatabaseAuthenticationManager(null, null);
+
+        _managedObjectRegistry = new NoopManagedObjectRegistry();
+
+        // We can't call getInstance here as we may be in the process of 
initialising ourselves!!!!
+        //ApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+        //_managedObjectRegistry = appRegistry.getManagedObjectRegistry();
+        //_vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+
+        _messageStore = new TestableMemoryMessageStore();
+
+        _virtualHostRegistry = new VirtualHostRegistry();
+
+        _vHost = new VirtualHost("test", _messageStore);
+
+        _virtualHostRegistry.registerVirtualHost(_vHost);
+
+        _queueRegistry = _vHost.getQueueRegistry();
+        _exchangeFactory = _vHost.getExchangeFactory();
+        _exchangeRegistry = _vHost.getExchangeRegistry();
+
+        _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+    }
+
+    public Configuration getConfiguration()
+    {
+        return _configuration;
+    }
+
+    public QueueRegistry getQueueRegistry()
+    {
+        return _queueRegistry;
+    }
+
+    public ExchangeRegistry getExchangeRegistry()
+    {
+        return _exchangeRegistry;
+    }
+
+    public ExchangeFactory getExchangeFactory()
+    {
+        return _exchangeFactory;
+    }
+
+    public ManagedObjectRegistry getManagedObjectRegistry()
+    {
+        return _managedObjectRegistry;
+    }
+
+    public PrincipalDatabaseManager getDatabaseManager()
+    {
+        return _databaseManager;
+    }
+
+    public AuthenticationManager getAuthenticationManager()
+    {
+        return _authenticationManager;
+    }
+
+    public Collection<String> getVirtualHostNames()
+    {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public VirtualHostRegistry getVirtualHostRegistry()
+    {
+        return _virtualHostRegistry;
+    }
+
+    public ACLPlugin getAccessManager()
+    {
+        return _accessManager;
+    }
+
+    public void setAccessManager(ACLPlugin newManager)
+    {
+        _accessManager = newManager;
+    }
+
+    public MessageStore getMessageStore()
+    {
+        return _messageStore;
+    }
+
+    public PluginManager getPluginManager()
+    {
+        return null;
+    }
+}
+
+

Propchange: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to