Author: aidan
Date: Fri Oct 24 09:20:16 2008
New Revision: 707672
URL: http://svn.apache.org/viewvc?rev=707672&view=rev
Log:
QPID-1315: Fix style issue, iterator control usage as per review comments from
rgodfrey.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=707672&r1=707671&r2=707672&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Fri Oct 24 09:20:16 2008
@@ -333,7 +333,8 @@
Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
if (sub != null)
{
- try {
+ try
+ {
sub.getSendLock();
sub.getQueue().unregisterSubscription(sub);
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=707672&r1=707671&r2=707672&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Fri Oct 24 09:20:16 2008
@@ -1204,16 +1204,16 @@
flushSubscription(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, Long deliveries) throws
AMQException
+ public boolean flushSubscription(Subscription sub, Long iterations) throws
AMQException
{
boolean atTail = false;
- while (!sub.isSuspended() && !atTail && deliveries != 0)
+ while (!sub.isSuspended() && !atTail && iterations != 0)
{
try
{
sub.getSendLock();
- atTail = attemptDelivery(sub, deliveries);
+ atTail = attemptDelivery(sub);
if (atTail && sub.isAutoClose())
{
unregisterSubscription(sub);
@@ -1221,6 +1221,10 @@
ProtocolOutputConverter converter =
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(),
sub.getConsumerTag());
}
+ else if (!atTail)
+ {
+ iterations--;
+ }
}
finally
{
@@ -1239,7 +1243,7 @@
return atTail;
}
- private boolean attemptDelivery(Subscription sub, Long deliveries) throws
AMQException
+ private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
boolean advanced = false;
@@ -1258,11 +1262,9 @@
if (!sub.isBrowser() && !node.acquire(sub))
{
sub.restoreCredit(node);
-
}
else
{
- deliveries--;
deliverMessage(sub, node);
if (sub.isBrowser())
@@ -1352,11 +1354,11 @@
boolean deliveryIncomplete = true;
int extraLoops = 1;
- Long deliveries = new Long(MAX_ASYNC_DELIVERIES);
+ Long iterations = new Long(MAX_ASYNC_DELIVERIES);
_asynchronousRunner.compareAndSet(runner, null);
- while (deliveries != 0 && ((previousStateChangeCount !=
(stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) &&
_asynchronousRunner.compareAndSet(null, runner))
+ while (iterations != 0 && ((previousStateChangeCount !=
(stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) &&
_asynchronousRunner.compareAndSet(null, runner))
{
// we want to have one extra loop after every subscription has
reached the point where it cannot move
// further, just in case the advance of one subscription in the
last loop allows a different subscription to
@@ -1386,7 +1388,7 @@
QueueEntry node = moveSubscriptionToNextNode(sub);
if (node != null)
{
- done = attemptDelivery(sub, deliveries);
+ done = attemptDelivery(sub);
}
}
if (done)
@@ -1409,6 +1411,7 @@
}
else
{
+ iterations--;
extraLoops = 1;
}
}
@@ -1422,7 +1425,7 @@
// If deliveries == 0 then the limitting factor was the time-slicing
rather than available messages or credit
// therefore we should schedule this runner again (unless someone
beats us to it :-) ).
- if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner))
+ if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
_asyncDelivery.execute(runner);
}