Author: rgodfrey
Date: Thu May 29 04:28:41 2008
New Revision: 661296

URL: http://svn.apache.org/viewvc?rev=661296&view=rev
Log:
tidy up

Modified:
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=661296&r1=661295&r2=661296&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Thu May 29 04:28:41 2008
@@ -75,12 +75,8 @@
 
     private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
 
-
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
 
-    private final AtomicBoolean _quiesced = new AtomicBoolean(false);
-
-
     protected final SubscriptionList _subscriptionList = new 
SubscriptionList(this);
     private final AtomicReference<SubscriptionList.SubscriptionNode> 
_lastSubscriptionNode = new 
AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
 
@@ -124,7 +120,6 @@
     private final Set<NotificationCheck> _notificationChecks = 
EnumSet.noneOf(NotificationCheck.class);
 
 
-
     private final AtomicLong _stateChangeCount = new 
AtomicLong(Long.MIN_VALUE);
     private AtomicReference _asynchronousRunner = new AtomicReference(null);
     private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -163,8 +158,6 @@
 
         _asyncDelivery = 
ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
-        AsyncDeliveryConfig.getAsyncDeliveryExecutor();
-
         try
         {
             _managedObject = new AMQQueueMBean(this);
@@ -278,7 +271,7 @@
         }
 
 
-        deliverAsync();
+        deliverAsync(subscription);
 
     }
 
@@ -346,6 +339,7 @@
 
         QueueEntry entry; 
         Subscription exclusiveSub = _exclusiveSubscriber;
+
         if(exclusiveSub != null)
         {
             exclusiveSub.getSendLock();
@@ -353,6 +347,7 @@
             try
             {
                 entry = _entries.add(message);
+
                 deliverToSubscription(exclusiveSub, entry);
 
 
@@ -405,7 +400,7 @@
             // this catches the case where we *just* miss an update
             int loops = 2;
 
-            while(!entry.isAcquired() && loops != 0)
+            while(!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
             {
                 if(nextNode == null)
                 {
@@ -454,6 +449,8 @@
     private void deliverToSubscription(final Subscription sub, final 
QueueEntry entry)
             throws AMQException
     {
+        // the send lock is a read/write lock that prevents the subscription 
from changing status while we are in this
+        // block
         sub.getSendLock();
         try
         {
@@ -465,10 +462,14 @@
                 {
                     if(!sub.isBrowser() && !entry.acquire(sub))
                     {
+                        // restore credit here that would have been taken away 
by wouldSuspend since we didn't manage
+                        // to acquire the entry for this subscription
                         sub.restoreCredit(entry);
                     }
                     else
                     {
+                        // Update the last seen marker for this subscription, 
if some other process hasn't already
+                        // updated it
                         QueueEntry queueEntryNode =  sub.getLastSeenEntry();
                         if(_entries.next(queueEntryNode) == entry)
                         {
@@ -490,6 +491,7 @@
     protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
     {
         // This method is only required for queues which mess with ordering
+        // Simple Queues don't :-)
     }
 
     private void incrementQueueSize(final AMQMessage message)
@@ -513,6 +515,8 @@
     private boolean subscriptionReadyAndHasInterest(final Subscription sub, 
final QueueEntry entry)
     {
 
+        // We need to move this subscription on, past entries which are 
already acquired, or deleted or ones it has no
+        // interest in.
         QueueEntry node = sub.getLastSeenEntry();
         while(node != null && (node.isAcquired() || node.isDeleted() || 
!sub.hasInterest(node)) )
         {
@@ -531,28 +535,40 @@
 
         }
 
+
         if(node == entry)
         {
+            // If the first entry that subscription can process is the one we 
are trying to deliver to it, then we are
+            // good
             return true;
         }
         else
         {
-            node = sub.getLastSeenEntry();
-            if(node != null && entry.compareTo(node) < 0 && 
sub.hasInterest(entry))
+            // Otherwise we should try to update the subscription's last seen 
entry to the entry we got to, providing
+            // no-one else has updated it to something furhter on in the list
+            updateLastSeenEntry(sub, entry);
+            return false;
+        }
+
+    }
+
+    private void updateLastSeenEntry(final Subscription sub, final QueueEntry 
entry)
+    {
+        QueueEntry node = sub.getLastSeenEntry();
+
+        if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+        {
+            do
             {
-                do
+                if(sub.setLastSeenEntry(node,entry))
                 {
-                    if(sub.setLastSeenEntry(node,entry))
-                    {
-                        return true;
-                    }
-                    else
-                    {
-                        node = sub.getLastSeenEntry();
-                    }
-                } while (node != null && entry.compareTo(node) < 0);
-            }
-            return false;
+                    return;
+                }
+                else
+                {
+                    node = sub.getLastSeenEntry();
+                }
+            } while (node != null && entry.compareTo(node) < 0);
         }
 
     }
@@ -569,18 +585,7 @@
             // we don't make browsers send the same stuff twice
             if(!sub.isBrowser())
             {
-                QueueEntry subEntry = sub.getLastSeenEntry();
-                while(subEntry != null && entry.compareTo(subEntry)<0)
-                {
-                    if(sub.setLastSeenEntry(subEntry,entry))
-                    {
-                        break;
-                    }
-                    else
-                    {
-                        subEntry = sub.getLastSeenEntry();
-                    }
-                }
+                updateLastSeenEntry(sub, entry);
             }
         }
 
@@ -1307,6 +1312,10 @@
 
         }
 
+        // if there's (potentially) more than one subscription the others will 
potentially not have been advanced to the
+        // next entry they are interested in yet.  This would lead to holding 
on to references to expired messages, etc
+        // which would give us memory "leak".
+
         if(!isExclusiveSubscriber())
         {
             advanceAllSubscriptions();
@@ -1494,7 +1503,8 @@
             _asynchronousRunner.set(null);
         }
 
-
+        // If deliveries == 0 then the limitting factor was the time-slicing 
rather than available messages or credit
+        // therefore we should schedule this runner again (unless someone 
beats us to it :-) ).
         if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
         {
             _asyncDelivery.execute(runner);
@@ -1616,43 +1626,36 @@
         return _notificationChecks;
     }
 
-
-
-
-
     public ManagedObject getManagedObject()
     {
         return _managedObject;
     }
 
-    public int N(final Object o)
-    {
-        return _name.compareTo(((AMQQueue) o).getName());
-    }
-
     private final class QueueEntryListener implements 
QueueEntry.StateChangeListener
     {
         private final QueueEntry _entry;
+        private final Subscription _sub;
 
         public QueueEntryListener(final Subscription sub, final QueueEntry 
entry)
         {
             _entry = entry;
+            _sub = sub;
         }
 
         public boolean equals(Object o)
         {
-            return _entry == ((QueueEntryListener)o)._entry;
+            return _entry == ((QueueEntryListener)o)._entry && _sub == 
((QueueEntryListener)o)._sub;
         }
 
         public int hashCode()
         {
-            return System.identityHashCode(_entry);
+            return System.identityHashCode(_entry) ^ 
System.identityHashCode(_sub);
         }
 
         public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, 
QueueEntry.State newState)
         {
             entry.removeStateChangeListener(this);
-            deliverAsync();
+            deliverAsync(_sub);
         }
     }
 }
\ No newline at end of file


Reply via email to