Author: ritchiem
Date: Fri Jun 13 07:56:45 2008
New Revision: 667561
URL: http://svn.apache.org/viewvc?rev=667561&view=rev
Log:
QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking.
Added a new InternalBrokerBaseCase for performing testing on the broker without
using the client libraries. This allows for testing closer to AMQP.
Further investigation is required to identify why the .Net was causing the
refcounting problems that required the previous change to Unacknowledged
message introducing this .
Added:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
(with props)
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
(with props)
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
(with props)
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
(with props)
Removed:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
incubator/qpid/branches/M2.x/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
Fri Jun 13 07:56:45 2008
@@ -26,6 +26,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.store.StoreContext;
public interface UnacknowledgedMessageMap
{
@@ -55,7 +56,7 @@
UnacknowledgedMessage remove(long deliveryTag);
- void drainTo(Collection<UnacknowledgedMessage> destination, long
deliveryTag) throws AMQException;
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws
AMQException;
Collection<UnacknowledgedMessage> cancelAllMessages();
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
Fri Jun 13 07:56:45 2008
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -160,7 +161,7 @@
}
}
- public void drainTo(Collection<UnacknowledgedMessage> destination, long
deliveryTag) throws AMQException
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws
AMQException
{
synchronized (_lock)
{
@@ -176,10 +177,12 @@
" When deliveryTag is:" +
deliveryTag + "ES:" + _map.entrySet().toString());
}
+ //Message has been ack so discard it. This will dequeue and
decrement the reference.
+ unacked.getValue().discard(storeContext);
+
it.remove();
_unackedSize -= unacked.getValue().getMessage().getSize();
- destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
break;
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Fri Jun 13 07:56:45 2008
@@ -159,28 +159,13 @@
throw new AMQException("Multiple ack on delivery tag " +
deliveryTag + " not known for channel");
}
- LinkedList<UnacknowledgedMessage> acked = new
LinkedList<UnacknowledgedMessage>();
- unacknowledgedMessageMap.drainTo(acked, deliveryTag);
- for (UnacknowledgedMessage msg : acked)
- {
- if (debug)
- {
- _log.debug("Discarding message: " +
msg.getMessage().getMessageId());
- }
- if(msg.getMessage().isPersistent())
- {
- beginTranIfNecessary();
- }
-
- //Message has been ack so discard it. This will dequeue
and decrement the reference.
- msg.discard(_storeContext);
- }
+ unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
}
}
else
{
UnacknowledgedMessage msg;
- msg = unacknowledgedMessageMap.remove(deliveryTag);
+ msg = unacknowledgedMessageMap.get(deliveryTag);
if (msg == null)
{
@@ -202,6 +187,8 @@
//Message has been ack so discard it. This will dequeue and
decrement the reference.
msg.discard(_storeContext);
+ unacknowledgedMessageMap.remove(deliveryTag);
+
if (debug)
{
_log.debug("Received non-multiple ack for messaging with
delivery tag " + deliveryTag + " msg id " +
Added:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java?rev=667561&view=auto
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
(added)
+++
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
Fri Jun 13 07:56:45 2008
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+
+import java.util.List;
+
+public class AcknowledgeTest extends InternalBrokerBaseCase
+{
+
+ public void testTransactionalSingleAck() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(1, 1, 1, false, 0);
+ }
+
+ public void testTransactionalMultiAck() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(10, 1, 5, true, 5);
+ }
+
+ public void testTransactionalAckAll() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(10, 1, 0, true, 0);
+ }
+
+ public void testNonTransactionalSingleAck() throws AMQException
+ {
+ runMessageAck(1, 1, 1, false, 0);
+ }
+
+ public void testNonTransactionalMultiAck() throws AMQException
+ {
+ runMessageAck(10, 1, 5, true, 5);
+ }
+
+ public void testNonTransactionalAckAll() throws AMQException
+ {
+ runMessageAck(10, 1, 0, true, 0);
+ }
+
+ protected void runMessageAck(int sendMessageCount, long firstDeliveryTag,
long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int
remainingUnackedMessages) throws AMQException
+ {
+ //Check store is empty
+ checkStoreContents(0);
+
+ //Send required messsages to the queue
+ publishMessages(_session, _channel, sendMessageCount);
+
+ if (_channel.isTransactional())
+ {
+ _channel.commit();
+ }
+
+ //Ensure they are stored
+ checkStoreContents(sendMessageCount);
+
+ //Check that there are no unacked messages
+ assertEquals("Channel should have no unacked msgs ", 0,
_channel.getUnacknowledgedMessageMap().size());
+
+ //Subscribe to the queue
+ AMQShortString subscriber = subscribe(_session, _channel, _queue);
+
+ _queue.deliverAsync();
+
+ //Wait for the messages to be delivered
+ _session.awaitDelivery(sendMessageCount);
+
+ //Check that they are all waiting to be acknoledged
+ assertEquals("Channel should have unacked msgs", sendMessageCount,
_channel.getUnacknowledgedMessageMap().size());
+
+ List<InternalTestProtocolSession.DeliveryPair> messages =
_session.getDelivers(_channel.getChannelId(), subscriber, sendMessageCount);
+
+ //Double check we received the right number of messages
+ assertEquals(sendMessageCount, messages.size());
+
+ //Check that the first message has the expected deliveryTag
+ assertEquals("First message does not have expected deliveryTag",
firstDeliveryTag, messages.get(0).getDeliveryTag());
+
+ //Send required Acknowledgement
+ _channel.acknowledgeMessage(acknowledgeDeliveryTag,
acknowldegeMultiple);
+
+ if (_channel.isTransactional())
+ {
+ _channel.commit();
+ }
+
+ // Check Remaining Acknowledgements
+ assertEquals("Channel unacked msgs count incorrect",
remainingUnackedMessages, _channel.getUnacknowledgedMessageMap().size());
+
+ //Check store contents are also correct.
+ checkStoreContents(remainingUnackedMessages);
+ }
+
+}
Propchange:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=667561&view=auto
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
(added)
+++
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
Fri Jun 13 07:56:45 2008
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InternalTestProtocolSession extends AMQMinaProtocolSession
implements ProtocolOutputConverter
+{
+ // ChannelID(LIST) -> LinkedList<Pair>
+ final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>
_channelDelivers;
+ private AtomicInteger _deliveryCount = new AtomicInteger(0);
+
+ public InternalTestProtocolSession() throws AMQException
+ {
+ super(new TestIoSession(),
+ ApplicationRegistry.getInstance().getVirtualHostRegistry(),
+ new AMQCodecFactory(true));
+
+ _channelDelivers = new HashMap<Integer, Map<AMQShortString,
LinkedList<DeliveryPair>>>();
+
+ }
+
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return this;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return (byte) 8;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return (byte) 0;
+ }
+
+ // ***
+
+ public List<DeliveryPair> getDelivers(int channelId, AMQShortString
consumerTag, int count)
+ {
+ synchronized (_channelDelivers)
+ {
+ List<DeliveryPair> msgs =
_channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+
+ List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
+
+ //Remove the msgs from the receivedList.
+ msgs.clear();
+
+ return response;
+ }
+ }
+
+ // *** ProtocolOutputConverter Implementation
+ public void writeReturn(AMQMessage message, int channelId, int replyCode,
AMQShortString replyText) throws AMQException
+ {
+ }
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString
consumerTag)
+ {
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long
deliveryTag, AMQShortString consumerTag) throws AMQException
+ {
+ _deliveryCount.incrementAndGet();
+
+ synchronized (_channelDelivers)
+ {
+ Map<AMQShortString, LinkedList<DeliveryPair>> consumers =
_channelDelivers.get(channelId);
+
+ if (consumers == null)
+ {
+ consumers = new HashMap<AMQShortString,
LinkedList<DeliveryPair>>();
+ _channelDelivers.put(channelId, consumers);
+ }
+
+ LinkedList<DeliveryPair> consumerDelivers =
consumers.get(consumerTag);
+
+ if (consumerDelivers == null)
+ {
+ consumerDelivers = new LinkedList<DeliveryPair>();
+ consumers.put(consumerTag, consumerDelivers);
+ }
+
+ consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+ }
+ }
+
+ public void writeGetOk(AMQMessage message, int channelId, long
deliveryTag, int queueSize) throws AMQException
+ {
+ }
+
+ public void awaitDelivery(int msgs)
+ {
+ int start = _deliveryCount.get();
+
+ while ((start + msgs) > _deliveryCount.get())
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public class DeliveryPair
+ {
+ private long _deliveryTag;
+ private AMQMessage _message;
+
+ public DeliveryPair(long deliveryTag, AMQMessage message)
+ {
+ _deliveryTag = deliveryTag;
+ _message = message;
+ }
+
+ public AMQMessage getMessage()
+ {
+ return _message;
+ }
+
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+ }
+}
Propchange:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
Fri Jun 13 07:56:45 2008
@@ -32,7 +32,7 @@
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
@@ -171,7 +171,7 @@
*/
public void testQueueDepthAlertWithSubscribers() throws Exception
{
- protocolSession = new TestMinaProtocolSession();
+ protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
protocolSession.addChannel(channel);
Modified:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=667561&r1=667560&r2=667561&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Fri Jun 13 07:56:45 2008
@@ -29,7 +29,7 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -121,7 +121,7 @@
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- TestMinaProtocolSession protocolSession = new
TestMinaProtocolSession();
+ InternalTestProtocolSession protocolSession = new
InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
@@ -277,7 +277,7 @@
_queue = new AMQQueue(new AMQShortString("testQueue"), false, new
AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
private void sendMessages(int messageCount, boolean persistent) throws
AMQException
Added:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=667561&view=auto
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
(added)
+++
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
Fri Jun 13 07:56:45 2008
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+public class InternalBrokerBaseCase extends TestCase
+{
+ protected IApplicationRegistry _registry;
+ protected MessageStore _messageStore;
+ protected AMQChannel _channel;
+ protected InternalTestProtocolSession _session;
+ protected VirtualHost _virtualHost;
+ protected StoreContext _storeContext = new StoreContext();
+ protected AMQQueue _queue;
+ protected AMQShortString QUEUE_NAME;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _registry = new TestApplicationRegistry();
+ ApplicationRegistry.initialise(_registry);
+ _virtualHost =
_registry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
+
+ QUEUE_NAME = new AMQShortString("test");
+ _queue = new AMQQueue(QUEUE_NAME, false, new
AMQShortString("testowner"), false, _virtualHost);
+
+ _virtualHost.getQueueRegistry().registerQueue(_queue);
+
+ Exchange defaultExchange =
_virtualHost.getExchangeRegistry().getDefaultExchange();
+
+ _queue.bind(QUEUE_NAME, null, defaultExchange);
+
+ _session = new InternalTestProtocolSession();
+
+ _channel = new AMQChannel(_session, 1, _messageStore);
+
+ _session.addChannel(_channel);
+ }
+
+ public void tearDown() throws Exception
+ {
+ ApplicationRegistry.removeAll();
+ super.tearDown();
+ }
+
+ protected void checkStoreContents(int messageCount)
+ {
+ assertEquals("Message header count incorrect in the MetaDataMap",
messageCount, ((TestableMemoryMessageStore)
_messageStore).getMessageMetaDataMap().size());
+
+ //The above publish message is sufficiently small not to fit in the
header so no Body is required.
+ //assertEquals("Message body count incorrect in the ContentBodyMap",
messageCount, ((TestableMemoryMessageStore)
_messageStore).getContentBodyMap().size());
+ }
+
+ protected AMQShortString subscribe(InternalTestProtocolSession session,
AMQChannel channel, AMQQueue queue)
+ {
+ try
+ {
+ return channel.subscribeToQueue(null, queue, session, true, null,
false, true);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ fail(e.getMessage());
+ }
+ //Keep the compiler happy
+ return null;
+ }
+
+ public void publishMessages(InternalTestProtocolSession session,
AMQChannel channel, int messages) throws AMQException
+ {
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("test");
+ }
+ };
+
+ for (int count = 0; count < messages; count++)
+ {
+ channel.setPublishFrame(info, session,
_virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
+
+ //Set the body size
+ ContentHeaderBody _headerBody = new ContentHeaderBody();
+ _headerBody.bodySize = 0;
+
+ //Set Minimum properties
+ BasicContentHeaderProperties properties = new
BasicContentHeaderProperties();
+
+ properties.setExpiration(0L);
+ properties.setTimestamp(System.currentTimeMillis());
+ //Make Message Persistent
+ properties.setDeliveryMode((byte) 2);
+
+ _headerBody.properties = properties;
+
+ channel.publishContentHeader(_headerBody, session);
+ }
+
+ }
+}
Propchange:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=667561&view=auto
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
(added)
+++
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
Fri Jun 13 07:56:45 2008
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import
org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import
org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Properties;
+
+public class TestApplicationRegistry extends ApplicationRegistry
+{
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private ManagedObjectRegistry _managedObjectRegistry;
+
+ private ACLPlugin _accessManager;
+
+ private PrincipalDatabaseManager _databaseManager;
+
+ private AuthenticationManager _authenticationManager;
+
+ private MessageStore _messageStore;
+ private VirtualHost _vHost;
+
+ private VirtualHostRegistry _virtualHostRegistry;
+
+ public TestApplicationRegistry()
+ {
+ super(new MapConfiguration(new HashMap()));
+ }
+
+ public void initialise() throws Exception
+ {
+ Properties users = new Properties();
+
+ users.put("guest", "guest");
+
+ _databaseManager = new PropertiesPrincipalDatabaseManager("default",
users);
+
+ _accessManager = new AllowAll();
+
+ _authenticationManager = new
PrincipalDatabaseAuthenticationManager(null, null);
+
+ _managedObjectRegistry = new NoopManagedObjectRegistry();
+
+ // We can't call getInstance here as we may be in the process of
initialising ourselves!!!!
+ //ApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ //_managedObjectRegistry = appRegistry.getManagedObjectRegistry();
+ //_vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+
+ _messageStore = new TestableMemoryMessageStore();
+
+ _virtualHostRegistry = new VirtualHostRegistry();
+
+ _vHost = new VirtualHost("test", _messageStore);
+
+ _virtualHostRegistry.registerVirtualHost(_vHost);
+
+ _queueRegistry = _vHost.getQueueRegistry();
+ _exchangeFactory = _vHost.getExchangeFactory();
+ _exchangeRegistry = _vHost.getExchangeRegistry();
+
+ _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+ }
+
+ public Configuration getConfiguration()
+ {
+ return _configuration;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return _managedObjectRegistry;
+ }
+
+ public PrincipalDatabaseManager getDatabaseManager()
+ {
+ return _databaseManager;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public Collection<String> getVirtualHostNames()
+ {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return _virtualHostRegistry;
+ }
+
+ public ACLPlugin getAccessManager()
+ {
+ return _accessManager;
+ }
+
+ public void setAccessManager(ACLPlugin newManager)
+ {
+ _accessManager = newManager;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public PluginManager getPluginManager()
+ {
+ return null;
+ }
+}
+
+
Propchange:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
------------------------------------------------------------------------------
svn:keywords = Rev Date