Author: ritchiem
Date: Thu Apr 19 08:10:10 2007
New Revision: 530444

URL: http://svn.apache.org/viewvc?view=rev&rev=530444
Log:
QPID-454 - Message 'taken' notion is per message. Adjusted to be per message 
per queue.

Previous commit was not sufficiently tested and other bugs were causing 
problems that were not related to this change.

Modified:
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=530444&r1=530443&r2=530444
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Thu Apr 19 08:10:10 2007
@@ -80,18 +80,17 @@
      */
     private boolean _immediate;
 
-    private AtomicBoolean _taken = new AtomicBoolean(false);
+    //    private Subscription _takenBySubcription;
+    //    private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new 
TransientMessageData();
 
-    private Subscription _takenBySubcription;
+
     private Set<Subscription> _rejectedBy = null;
+
+
     private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, 
AtomicBoolean>();
     private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new 
HashMap<AMQQueue, Subscription>();
 
-    public boolean isTaken(AMQQueue queue)
-    {
-        return _taken.get();
-    }
 
     private final int hashcode = System.identityHashCode(this);
 
@@ -206,7 +205,7 @@
         _immediate = info.isImmediate();
         _transientMessageData.setMessagePublishInfo(info);
 
-        _taken = new AtomicBoolean(false);
+//        _taken = new AtomicBoolean(false);
 
         if (_log.isDebugEnabled())
         {
@@ -326,7 +325,6 @@
 
         for (AMQQueue q : _transientMessageData.getDestinationQueues())
         {
-            _takenMap.put(q, new AtomicBoolean(false));
             _messageHandle.enqueue(storeContext, _messageId, q);
         }
 
@@ -459,17 +457,53 @@
         return _deliveredToConsumer;
     }
 
-
-    public boolean taken(AMQQueue queue, Subscription sub)
+    public boolean isTaken(AMQQueue queue)
     {
-        if (_taken.getAndSet(true))
+        //return _taken.get();
+
+        synchronized (this)
         {
-            return true;
+            AtomicBoolean taken = _takenMap.get(queue);
+            if (taken == null)
+            {
+                taken = new AtomicBoolean(false);
+                _takenMap.put(queue, taken);
+            }
+
+            return taken.get();
         }
-        else
+    }
+
+    public boolean taken(AMQQueue queue, Subscription sub)
+    {
+//        if (_taken.getAndSet(true))
+//        {
+//            return true;
+//        }
+//        else
+//        {
+//            _takenBySubcription = sub;
+//            return false;
+//        }
+
+        synchronized (this)
         {
-            _takenBySubcription = sub;
-            return false;
+            AtomicBoolean taken = _takenMap.get(queue);
+            if (taken == null)
+            {
+                taken = new AtomicBoolean(false);
+            }
+
+            if (taken.getAndSet(true))
+            {
+                return true;
+            }
+            else
+            {
+                _takenMap.put(queue, taken);
+                _takenBySubcriptionMap.put(queue, sub);
+                return false;
+            }
         }
     }
 
@@ -479,8 +513,26 @@
         {
             _log.trace("Releasing Message:" + debugIdentity());
         }
-        _taken.set(false);
-        _takenBySubcription = null;
+
+//        _taken.set(false);
+//        _takenBySubcription = null;
+
+
+        synchronized (this)
+        {
+            AtomicBoolean taken = _takenMap.get(queue);
+            if (taken == null)
+            {
+                taken = new AtomicBoolean(false);
+            }
+            else
+            {
+                taken.set(false);
+            }
+
+            _takenMap.put(queue, taken);
+            _takenBySubcriptionMap.put(queue, null);
+        }
     }
 
     public boolean checkToken(Object token)
@@ -833,16 +885,20 @@
 
     public String toString()
     {
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref 
count: " + _referenceCount + "; taken : " +
-               _taken + " by :" + _takenBySubcription;
+//        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref 
count: " + _referenceCount + "; taken : " +
+//               _taken + " by :" + _takenBySubcription;
 
-//        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref 
count: " + _referenceCount + "; taken for queues: " +
-//               _takenMap.toString() + " by Subs:" + 
_takenBySubcriptionMap.toString();
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref 
count: " + _referenceCount + "; taken for queues: " +
+               _takenMap.toString() + " by Subs:" + 
_takenBySubcriptionMap.toString();
     }
 
     public Subscription getDeliveredSubscription(AMQQueue queue)
     {
-        return _takenBySubcription;
+//        return _takenBySubcription;
+        synchronized (this)
+        {
+            return _takenBySubcriptionMap.get(queue);
+        }
     }
 
     public void reject(Subscription subscription)


Reply via email to