Author: ritchiem
Date: Fri Jun 13 08:48:54 2008
New Revision: 667574
URL: http://svn.apache.org/viewvc?rev=667574&view=rev
Log:
Merged revisions 667561 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x
........
r667561 | ritchiem | 2008-06-13 15:56:45 +0100 (Fri, 13 Jun 2008) | 3 lines
QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking.
Added a new InternalBrokerBaseCase for performing testing on the broker without
using the client libraries. This allows for testing closer to AMQP.
Further investigation is required to identify why the .Net was causing the
refcounting problems that required the previous change to Unacknowledged
message introducing this .
........
Added:
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
- copied unchanged from r667561,
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
- copied unchanged from r667561,
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
- copied unchanged from r667561,
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
- copied unchanged from r667561,
incubator/qpid/branches/M2.x/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
Removed:
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
Modified:
incubator/qpid/branches/M2.1.x/ (props changed)
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
Propchange: incubator/qpid/branches/M2.1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
(original)
+++
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
Fri Jun 13 08:48:54 2008
@@ -26,6 +26,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.store.StoreContext;
public interface UnacknowledgedMessageMap
{
@@ -55,7 +56,7 @@
UnacknowledgedMessage remove(long deliveryTag);
- void drainTo(Collection<UnacknowledgedMessage> destination, long
deliveryTag) throws AMQException;
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws
AMQException;
Collection<UnacknowledgedMessage> cancelAllMessages();
Modified:
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
(original)
+++
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
Fri Jun 13 08:48:54 2008
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -169,7 +170,7 @@
}
}
- public void drainTo(Collection<UnacknowledgedMessage> destination, long
deliveryTag) throws AMQException
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws
AMQException
{
synchronized (_lock)
{
@@ -185,10 +186,12 @@
" When deliveryTag is:" +
deliveryTag + "ES:" + _map.entrySet().toString());
}
+ //Message has been ack so discard it. This will dequeue and
decrement the reference.
+ unacked.getValue().discard(storeContext);
+
it.remove();
_unackedSize -= unacked.getValue().getMessage().getSize();
- destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
break;
Modified:
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
(original)
+++
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Fri Jun 13 08:48:54 2008
@@ -155,31 +155,13 @@
throw new AMQException("Multiple ack on delivery tag " +
deliveryTag + " not known for channel");
}
- LinkedList<UnacknowledgedMessage> acked = new
LinkedList<UnacknowledgedMessage>();
- unacknowledgedMessageMap.drainTo(acked, deliveryTag);
- for (UnacknowledgedMessage msg : acked)
- {
- if (!_browsedAcks.contains(deliveryTag))
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Discarding message: " +
msg.getMessage().getMessageId());
- }
-
- //Message has been ack so discard it. This will
dequeue and decrement the reference.
- msg.discard(_storeContext);
- }
- else
- {
- _browsedAcks.remove(deliveryTag);
- }
- }
+ unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
}
}
else
{
UnacknowledgedMessage msg;
- msg = unacknowledgedMessageMap.remove(deliveryTag);
+ msg = unacknowledgedMessageMap.get(deliveryTag);
if (msg == null)
{
@@ -189,20 +171,10 @@
_channel.getChannelId());
}
- if (!_browsedAcks.contains(deliveryTag))
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Discarding message: " +
msg.getMessage().getMessageId());
- }
+ //Message has been ack so discard it. This will dequeue and
decrement the reference.
+ msg.discard(_storeContext);
- //Message has been ack so discard it. This will dequeue and
decrement the reference.
- msg.discard(_storeContext);
- }
- else
- {
- _browsedAcks.remove(deliveryTag);
- }
+ unacknowledgedMessageMap.remove(deliveryTag);
if (_log.isDebugEnabled())
{
Modified:
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
(original)
+++
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
Fri Jun 13 08:48:54 2008
@@ -32,7 +32,7 @@
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
@@ -172,7 +172,7 @@
*/
public void testQueueDepthAlertWithSubscribers() throws Exception
{
- protocolSession = new TestMinaProtocolSession();
+ protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
protocolSession.addChannel(channel);
Modified:
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++
incubator/qpid/branches/M2.1.x/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Fri Jun 13 08:48:54 2008
@@ -29,7 +29,7 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -122,7 +122,7 @@
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- TestMinaProtocolSession protocolSession = new
TestMinaProtocolSession();
+ InternalTestProtocolSession protocolSession = new
InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
@@ -278,7 +278,7 @@
_queue = new AMQQueue(new AMQShortString("testQueue"), false, new
AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
private void sendMessages(int messageCount, boolean persistent) throws
AMQException
Modified:
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=667574&r1=667573&r2=667574&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
(original)
+++
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
Fri Jun 13 08:48:54 2008
@@ -28,7 +28,6 @@
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;