Author: aidan
Date: Mon May 5 03:24:50 2008
New Revision: 653416
URL: http://svn.apache.org/viewvc?rev=653416&view=rev
Log:
QPID-1019 prevent messages being dequeued unecessarily, from rgodfrey
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
incubator/qpid/branches/M2.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=653416&r1=653415&r2=653416&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Mon May 5 03:24:50 2008
@@ -469,7 +469,7 @@
synchronized (_unacknowledgedMessageMap.getLock())
{
- _unacknowledgedMessageMap.add(deliveryTag, new
UnacknowledgedMessage(entry, consumerTag, deliveryTag));
+ _unacknowledgedMessageMap.add(deliveryTag, new
UnacknowledgedMessage(entry, consumerTag,
deliveryTag,_unacknowledgedMessageMap));
checkSuspension();
}
}
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?rev=653416&r1=653415&r2=653416&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
Mon May 5 03:24:50 2008
@@ -34,13 +34,18 @@
public final long deliveryTag;
private boolean _queueDeleted;
+ private final UnacknowledgedMessageMap _unacknowledgeMessageMap;
- public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag,
long deliveryTag)
+ public UnacknowledgedMessage(QueueEntry entry,
+ AMQShortString consumerTag,
+ long deliveryTag,
+ final UnacknowledgedMessageMap
unacknowledgedMessageMap)
{
this.entry = entry;
this.consumerTag = consumerTag;
this.deliveryTag = deliveryTag;
+ _unacknowledgeMessageMap = unacknowledgedMessageMap;
}
public String toString()
@@ -60,12 +65,20 @@
public void discard(StoreContext storeContext) throws AMQException
{
- if (entry.getQueue() != null)
+ synchronized(_unacknowledgeMessageMap)
{
- entry.getQueue().dequeue(storeContext, entry);
+ if(_unacknowledgeMessageMap.contains(deliveryTag))
+ {
+
+ if (entry.getQueue() != null)
+ {
+ entry.getQueue().dequeue(storeContext, entry);
+ }
+ //if the queue is null then the message is waiting to be
acked, but has been removed.
+ entry.getMessage().decrementReference(storeContext);
+ }
}
- //if the queue is null then the message is waiting to be acked, but
has been removed.
- entry.getMessage().decrementReference(storeContext);
+
}
public AMQMessage getMessage()
Modified:
incubator/qpid/branches/M2.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=653416&r1=653415&r2=653416&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
(original)
+++
incubator/qpid/branches/M2.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
Mon May 5 03:24:50 2008
@@ -137,7 +137,7 @@
};
TestMessage message = new TestMessage(deliveryTag, i, info,
txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(new
QueueEntry(null,message), null, deliveryTag));
+ _map.add(deliveryTag, new UnacknowledgedMessage(new
QueueEntry(null,message), null, deliveryTag, _map));
}
_acked = acked;
_unacked = unacked;