Author: rgodfrey
Date: Thu May 29 04:28:41 2008
New Revision: 661296
URL: http://svn.apache.org/viewvc?rev=661296&view=rev
Log:
tidy up
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=661296&r1=661295&r2=661296&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Thu May 29 04:28:41 2008
@@ -75,12 +75,8 @@
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
- private final AtomicBoolean _quiesced = new AtomicBoolean(false);
-
-
protected final SubscriptionList _subscriptionList = new
SubscriptionList(this);
private final AtomicReference<SubscriptionList.SubscriptionNode>
_lastSubscriptionNode = new
AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
@@ -124,7 +120,6 @@
private final Set<NotificationCheck> _notificationChecks =
EnumSet.noneOf(NotificationCheck.class);
-
private final AtomicLong _stateChangeCount = new
AtomicLong(Long.MIN_VALUE);
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -163,8 +158,6 @@
_asyncDelivery =
ReferenceCountingExecutorService.getInstance().acquireExecutorService();
- AsyncDeliveryConfig.getAsyncDeliveryExecutor();
-
try
{
_managedObject = new AMQQueueMBean(this);
@@ -278,7 +271,7 @@
}
- deliverAsync();
+ deliverAsync(subscription);
}
@@ -346,6 +339,7 @@
QueueEntry entry;
Subscription exclusiveSub = _exclusiveSubscriber;
+
if(exclusiveSub != null)
{
exclusiveSub.getSendLock();
@@ -353,6 +347,7 @@
try
{
entry = _entries.add(message);
+
deliverToSubscription(exclusiveSub, entry);
@@ -405,7 +400,7 @@
// this catches the case where we *just* miss an update
int loops = 2;
- while(!entry.isAcquired() && loops != 0)
+ while(!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
{
if(nextNode == null)
{
@@ -454,6 +449,8 @@
private void deliverToSubscription(final Subscription sub, final
QueueEntry entry)
throws AMQException
{
+ // the send lock is a read/write lock that prevents the subscription
from changing status while we are in this
+ // block
sub.getSendLock();
try
{
@@ -465,10 +462,14 @@
{
if(!sub.isBrowser() && !entry.acquire(sub))
{
+ // restore credit here that would have been taken away
by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
sub.restoreCredit(entry);
}
else
{
+ // Update the last seen marker for this subscription,
if some other process hasn't already
+ // updated it
QueueEntry queueEntryNode = sub.getLastSeenEntry();
if(_entries.next(queueEntryNode) == entry)
{
@@ -490,6 +491,7 @@
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
+ // Simple Queues don't :-)
}
private void incrementQueueSize(final AMQMessage message)
@@ -513,6 +515,8 @@
private boolean subscriptionReadyAndHasInterest(final Subscription sub,
final QueueEntry entry)
{
+ // We need to move this subscription on, past entries which are
already acquired, or deleted or ones it has no
+ // interest in.
QueueEntry node = sub.getLastSeenEntry();
while(node != null && (node.isAcquired() || node.isDeleted() ||
!sub.hasInterest(node)) )
{
@@ -531,28 +535,40 @@
}
+
if(node == entry)
{
+ // If the first entry that subscription can process is the one we
are trying to deliver to it, then we are
+ // good
return true;
}
else
{
- node = sub.getLastSeenEntry();
- if(node != null && entry.compareTo(node) < 0 &&
sub.hasInterest(entry))
+ // Otherwise we should try to update the subscription's last seen
entry to the entry we got to, providing
+ // no-one else has updated it to something furhter on in the list
+ updateLastSeenEntry(sub, entry);
+ return false;
+ }
+
+ }
+
+ private void updateLastSeenEntry(final Subscription sub, final QueueEntry
entry)
+ {
+ QueueEntry node = sub.getLastSeenEntry();
+
+ if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+ {
+ do
{
- do
+ if(sub.setLastSeenEntry(node,entry))
{
- if(sub.setLastSeenEntry(node,entry))
- {
- return true;
- }
- else
- {
- node = sub.getLastSeenEntry();
- }
- } while (node != null && entry.compareTo(node) < 0);
- }
- return false;
+ return;
+ }
+ else
+ {
+ node = sub.getLastSeenEntry();
+ }
+ } while (node != null && entry.compareTo(node) < 0);
}
}
@@ -569,18 +585,7 @@
// we don't make browsers send the same stuff twice
if(!sub.isBrowser())
{
- QueueEntry subEntry = sub.getLastSeenEntry();
- while(subEntry != null && entry.compareTo(subEntry)<0)
- {
- if(sub.setLastSeenEntry(subEntry,entry))
- {
- break;
- }
- else
- {
- subEntry = sub.getLastSeenEntry();
- }
- }
+ updateLastSeenEntry(sub, entry);
}
}
@@ -1307,6 +1312,10 @@
}
+ // if there's (potentially) more than one subscription the others will
potentially not have been advanced to the
+ // next entry they are interested in yet. This would lead to holding
on to references to expired messages, etc
+ // which would give us memory "leak".
+
if(!isExclusiveSubscriber())
{
advanceAllSubscriptions();
@@ -1494,7 +1503,8 @@
_asynchronousRunner.set(null);
}
-
+ // If deliveries == 0 then the limitting factor was the time-slicing
rather than available messages or credit
+ // therefore we should schedule this runner again (unless someone
beats us to it :-) ).
if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
{
_asyncDelivery.execute(runner);
@@ -1616,43 +1626,36 @@
return _notificationChecks;
}
-
-
-
-
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public int N(final Object o)
- {
- return _name.compareTo(((AMQQueue) o).getName());
- }
-
private final class QueueEntryListener implements
QueueEntry.StateChangeListener
{
private final QueueEntry _entry;
+ private final Subscription _sub;
public QueueEntryListener(final Subscription sub, final QueueEntry
entry)
{
_entry = entry;
+ _sub = sub;
}
public boolean equals(Object o)
{
- return _entry == ((QueueEntryListener)o)._entry;
+ return _entry == ((QueueEntryListener)o)._entry && _sub ==
((QueueEntryListener)o)._sub;
}
public int hashCode()
{
- return System.identityHashCode(_entry);
+ return System.identityHashCode(_entry) ^
System.identityHashCode(_sub);
}
public void stateChanged(QueueEntry entry, QueueEntry.State oldSate,
QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
- deliverAsync();
+ deliverAsync(_sub);
}
}
}
\ No newline at end of file