Author: aidan
Date: Fri May 16 08:20:52 2008
New Revision: 657101

URL: http://svn.apache.org/viewvc?rev=657101&view=rev
Log:
Merged revisions 653416 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x

........
  r653416 | aidan | 2008-05-05 11:24:50 +0100 (Mon, 05 May 2008) | 1 line
  
  QPID-1019 prevent messages being dequeued unecessarily, from rgodfrey
........

Modified:
    incubator/qpid/branches/M2.1.x/   (props changed)
    
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
    
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java

Propchange: incubator/qpid/branches/M2.1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=657101&r1=657100&r2=657101&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Fri May 16 08:20:52 2008
@@ -469,7 +469,7 @@
 
         synchronized (_unacknowledgedMessageMap.getLock())
         {
-            _unacknowledgedMessageMap.add(deliveryTag, new 
UnacknowledgedMessage(entry, consumerTag, deliveryTag));
+            _unacknowledgedMessageMap.add(deliveryTag, new 
UnacknowledgedMessage(entry, consumerTag, 
deliveryTag,_unacknowledgedMessageMap));
             checkSuspension();
         }
     }

Modified: 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?rev=657101&r1=657100&r2=657101&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
 Fri May 16 08:20:52 2008
@@ -34,13 +34,18 @@
     public final long deliveryTag;
 
     private boolean _queueDeleted;
+    private final UnacknowledgedMessageMap _unacknowledgeMessageMap;
 
 
-    public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, 
long deliveryTag)
+    public UnacknowledgedMessage(QueueEntry entry,
+                                 AMQShortString consumerTag,
+                                 long deliveryTag,
+                                 final UnacknowledgedMessageMap 
unacknowledgedMessageMap)
     {
         this.entry = entry;
         this.consumerTag = consumerTag;
         this.deliveryTag = deliveryTag;
+        _unacknowledgeMessageMap = unacknowledgedMessageMap;
     }
 
     public String toString()
@@ -60,12 +65,20 @@
 
     public void discard(StoreContext storeContext) throws AMQException
     {
-        if (entry.getQueue() != null)
+        synchronized(_unacknowledgeMessageMap)
         {
-            entry.getQueue().dequeue(storeContext, entry);
+            if(_unacknowledgeMessageMap.contains(deliveryTag))
+            {
+
+                if (entry.getQueue() != null)
+                {
+                    entry.getQueue().dequeue(storeContext, entry);
+                }
+                //if the queue is null then the message is waiting to be 
acked, but has been removed.
+                entry.getMessage().decrementReference(storeContext);
+            }
         }
-        //if the queue is null then the message is waiting to be acked, but 
has been removed.
-        entry.getMessage().decrementReference(storeContext);
+        
     }
 
     public AMQMessage getMessage()

Modified: 
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=657101&r1=657100&r2=657101&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
 (original)
+++ 
incubator/qpid/branches/M2.1.x/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
 Fri May 16 08:20:52 2008
@@ -139,7 +139,7 @@
                 };
 
                 TestMessage message = new TestMessage(deliveryTag, i, info, 
txnContext);
-                _map.add(deliveryTag, new UnacknowledgedMessage(new 
QueueEntry(null,message), null, deliveryTag));
+                _map.add(deliveryTag, new UnacknowledgedMessage(new 
QueueEntry(null,message), null, deliveryTag, _map));
             }
             _acked = acked;
             _unacked = unacked;


Reply via email to