Author: ritchiem
Date: Wed Sep 26 03:42:54 2007
New Revision: 579574
URL: http://svn.apache.org/viewvc?rev=579574&view=rev
Log:
QPID-610 : Fix for ManagementConcole NO_ACK & Msg Expire leaks. Updated
AMQQueueMBeanTest to verify the contents of the MessageStore after clearing.
All the MC features only dequeued the message and didn't correctly
decrementReference.
The MessageReturnTest was failing due to the TimeToLive test causing messages
to be left on the store. The expired messages were not having the reference
decremented.
Added:
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
(contents, props changed)
- copied, changed from r579147,
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Wed Sep 26 03:42:54 2007
@@ -581,7 +581,7 @@
/** Removes the AMQMessage from the top of the queue. */
public synchronized void deleteMessageFromTop(StoreContext storeContext)
throws AMQException
{
- _deliveryMgr.removeAMessageFromTop(storeContext);
+ _deliveryMgr.removeAMessageFromTop(storeContext, this);
}
/** removes all the messages from the queue. */
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Sep 26 03:42:54 2007
@@ -333,7 +333,7 @@
if (!acks)
{
- msg.decrementReference(channel.getStoreContext());
+ msg.decrementReference(channel.getStoreContext());
}
}
finally
@@ -407,11 +407,16 @@
*
* @throws AMQException
*/
- public void removeAMessageFromTop(StoreContext storeContext) throws
AMQException
+ public void removeAMessageFromTop(StoreContext storeContext, AMQQueue
queue) throws AMQException
{
_lock.lock();
AMQMessage message = _messages.poll();
+
+ message.dequeue(storeContext, queue);
+
+ message.decrementReference(storeContext);
+
if (message != null)
{
_totalMessageSize.addAndGet(-message.getSize());
@@ -434,6 +439,9 @@
_messages.poll();
_queue.dequeue(storeContext, msg);
+
+ msg.decrementReference(_reapingStoreContext);
+
msg = getNextMessage();
count++;
}
@@ -478,6 +486,8 @@
// Use the reapingStoreContext as any sub(if we have one) may
be in a tx.
message.dequeue(_reapingStoreContext, _queue);
+
+ message.decrementReference(_reapingStoreContext);
if (_log.isInfoEnabled())
{
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Wed Sep 26 03:42:54 2007
@@ -72,7 +72,7 @@
*/
void deliver(StoreContext storeContext, AMQShortString name, AMQMessage
msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
- void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
+ void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue)
throws AMQException;
long clearAllMessages(StoreContext storeContext) throws AMQException;
Modified:
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Wed Sep 26 03:42:54 2007
@@ -24,10 +24,13 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -36,6 +39,8 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
import java.util.LinkedList;
@@ -49,18 +54,16 @@
private static long MESSAGE_SIZE = 1000;
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
- private MessageStore _messageStore = new MemoryMessageStore();
+ private MessageStore _messageStore;
private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new
NonTransactionalContext(_messageStore, _storeContext,
-
null,
-
new LinkedList<RequiredDeliveryException>(),
-
new HashSet<Long>());
+ private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
+ private AMQProtocolSession _protocolSession;
- public void testMessageCount() throws Exception
+ public void testMessageCountTransient() throws Exception
{
int messageCount = 10;
- sendMessages(messageCount);
+ sendMessages(messageCount, false);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
@@ -73,6 +76,43 @@
_queueMBean.clearQueue();
assertTrue(_queueMBean.getMessageCount() == 0);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ public void testMessageCountPersistent() throws Exception
+ {
+ int messageCount = 10;
+ sendMessages(messageCount, true);
+ assertEquals("", messageCount,
_queueMBean.getMessageCount().intValue());
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+ _queueMBean.deleteMessageFromTop();
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ _queueMBean.clearQueue();
+ assertTrue(_queueMBean.getMessageCount() == 0);
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ // todo: collect to a general testing class -duplicated from
Systest/MessageReturntest
+ private void verifyBrokerState()
+ {
+
+ TestableMemoryMessageStore store = new
TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+
+ // Unlike MessageReturnTest there is no need for a delay as there this
thread does the clean up.
+ assertNotNull("ContentBodyMap should not be null",
store.getContentBodyMap());
+ assertEquals("Expected the store to have no content:" +
store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+ assertNotNull("MessageMetaDataMap should not be null",
store.getMessageMetaDataMap());
+ assertEquals("Expected the store to have no metadata:" +
store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
}
public void testConsumerCount() throws AMQException
@@ -86,26 +126,26 @@
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore,
null);
protocolSession.addChannel(channel);
- _queue.registerProtocolSession(protocolSession, 1, new
AMQShortString("test"), false, null,false,false);
+ _queue.registerProtocolSession(protocolSession, 1, new
AMQShortString("test"), false, null, false, false);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
SubscriptionSet _subscribers = (SubscriptionSet) mgr;
SubscriptionFactory subscriptionFactory = new
SubscriptionImpl.Factory();
- Subscription s1 =
subscriptionFactory.createSubscription(channel.getChannelId(),
-
protocolSession,
- new
AMQShortString("S1"),
- false,
- null,
- true,
- _queue);
-
- Subscription s2 =
subscriptionFactory.createSubscription(channel.getChannelId(),
-
protocolSession,
- new
AMQShortString("S2"),
- false,
- null,
- true,
- _queue);
+ Subscription s1 =
subscriptionFactory.createSubscription(channel.getChannelId(),
+
protocolSession,
+ new
AMQShortString("S1"),
+ false,
+ null,
+ true,
+ _queue);
+
+ Subscription s2 =
subscriptionFactory.createSubscription(channel.getChannelId(),
+
protocolSession,
+ new
AMQShortString("S2"),
+ false,
+ null,
+ true,
+ _queue);
_subscribers.addSubscriber(s1);
_subscribers.addSubscriber(s2);
assertTrue(_queueMBean.getActiveConsumerCount() == 3);
@@ -165,7 +205,7 @@
}
- AMQMessage msg = message(false);
+ AMQMessage msg = message(false, false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
@@ -184,7 +224,7 @@
}
}
- private AMQMessage message(final boolean immediate) throws AMQException
+ private AMQMessage message(final boolean immediate, boolean persistent)
throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -209,9 +249,11 @@
return null;
}
};
-
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties)
contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
return new AMQMessage(_messageStore.getNewMessageId(), publish,
_transactionalContext, contentHeaderBody);
}
@@ -221,22 +263,38 @@
super.setUp();
IApplicationRegistry applicationRegistry =
ApplicationRegistry.getInstance();
_virtualHost =
applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
+
+ _transactionalContext = new NonTransactionalContext(_messageStore,
_storeContext,
+ null,
+ new
LinkedList<RequiredDeliveryException>(),
+ new
HashSet<Long>());
+
_queue = new AMQQueue(new AMQShortString("testQueue"), false, new
AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
+
+ _protocolSession = new TestMinaProtocolSession();
}
- private void sendMessages(int messageCount) throws AMQException
+ private void sendMessages(int messageCount, boolean persistent) throws
AMQException
{
- AMQMessage[] messages = new AMQMessage[messageCount];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message(false);
- messages[i].enqueue(_queue);
- messages[i].routingComplete(_messageStore, _storeContext, new
MessageHandleFactory());
- }
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i], false);
+ AMQMessage currentMessage = message(false, persistent);
+ currentMessage.enqueue(_queue);
+
+ // route header
+ currentMessage.routingComplete(_messageStore, _storeContext, new
MessageHandleFactory());
+
+ // Add the body so we have somthing to test later
+ currentMessage.addContentBodyFrame(_storeContext,
+ _protocolSession.getRegistry()
+
.getProtocolVersionMethodConverter()
+ .convertToContentChunk(
+ new
ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
+
MESSAGE_SIZE)));
+
+
}
}
}
Copied:
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
(from r579147,
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java)
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?p2=incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java&p1=incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java&r1=579147&r2=579574&rev=579574&view=diff
==============================================================================
(empty)
Propchange:
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java?rev=579574&r1=579573&r2=579574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
(original)
+++
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
Wed Sep 26 03:42:54 2007
@@ -78,6 +78,8 @@
private CountDownLatch _returns = new CountDownLatch(1);
private int _receivedCount = 0;
+ private int _initialContentBodyMapSize;
+ private int _initilaMessageMetaDataMapSize;
protected void setUp() throws Exception
{
@@ -94,13 +96,19 @@
env.put("queue.badQueue", QUEUE);
_context = factory.getInitialContext(env);
+
+ getBrokerInitialState();
}
protected void tearDown() throws Exception
{
- _producerConnection.close();
super.tearDown();
+ if (_producerConnection != null)
+ {
+ _producerConnection.close();
+ }
+
if (BROKER.startsWith("vm://"))
{
TransportConnection.killAllVMBrokers();
@@ -130,7 +138,7 @@
_producerConnection.close();
//Verify we get all the messages.
- verifyAllMessagesRecevied();
+ verifyAllMessagesRecevied();
//Verify Broker state
verifyBrokerState();
}
@@ -153,6 +161,34 @@
_producer = _producerSession.createProducer((Queue)
_context.lookup("badQueue"));
}
+ // todo: collect to a general testing class - duplicated in
AMQQueueMBeanTest
+ private void getBrokerInitialState()
+ {
+ IApplicationRegistry registry = ApplicationRegistry.getInstance();
+
+ VirtualHost testVhost =
registry.getVirtualHostRegistry().getVirtualHost(VHOST);
+
+ assertNotNull("Unable to get test Vhost", testVhost.getMessageStore());
+
+ TestableMemoryMessageStore store = new
TestableMemoryMessageStore((MemoryMessageStore) testVhost.getMessageStore());
+
+ _initialContentBodyMapSize = store.getContentBodyMap() == null ? 0 :
store.getContentBodyMap().size();
+ _initilaMessageMetaDataMapSize = store.getMessageMetaDataMap() == null
? 0 : store.getMessageMetaDataMap().size();
+
+ if (_initialContentBodyMapSize != 0)
+ {
+ _logger.warn("Store is dirty: ContentBodyMap has Size:" +
_initialContentBodyMapSize);
+ System.out.println("Store is dirty: ContentBodyMap has Size:" +
_initialContentBodyMapSize);
+ }
+
+ if (_initilaMessageMetaDataMapSize != 0)
+ {
+ _logger.warn("Store is dirty: MessageMetaDataMap has Size:" +
_initilaMessageMetaDataMapSize);
+ System.out.println("Store is dirty: MessageMetaDataMap has Size:"
+ _initilaMessageMetaDataMapSize);
+ }
+
+ }
+
private void verifyBrokerState()
{
IApplicationRegistry registry = ApplicationRegistry.getInstance();
@@ -169,7 +205,7 @@
// If the CBM has content it may be due to the broker not yet purging.
// Closing the producer connection before testing should give the
store time to clean up.
// Perform a quick sleep just in case
- if (store.getContentBodyMap().size() != 0)
+ while (store.getContentBodyMap().size() > _initialContentBodyMapSize)
{
try
{
@@ -179,21 +215,9 @@
{
}
}
- assertEquals("Expected the store to have no content:" +
store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+ assertTrue("Expected the store content size not reached at test start
it was :" + _initialContentBodyMapSize + " Now it is :" +
store.getContentBodyMap().size(), _initialContentBodyMapSize >=
store.getContentBodyMap().size());
assertNotNull("MessageMetaDataMap should not be null",
store.getMessageMetaDataMap());
-
- if (store.getMessageMetaDataMap().size() != 0)
- {
- try
- {
- Thread.sleep(500);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- assertEquals("Expected the store to have no metadata:" +
store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+ assertTrue("Expected the store MessageMetaData size not reached at
test start it was :" + _initilaMessageMetaDataMapSize + " Now it is :" +
store.getMessageMetaDataMap().size(), _initialContentBodyMapSize >=
store.getMessageMetaDataMap().size());
}
private void verifyAllMessagesRecevied()
@@ -221,7 +245,7 @@
/**
* We can't verify messageOrder here as the return threads are not
synchronized so we have no way of
- * guarranting the order.
+ * guarranting the order.
*/
private void verifyMessageOrder()
{