Author: aidan
Date: Fri Oct 17 09:09:51 2008
New Revision: 705657
URL: http://svn.apache.org/viewvc?rev=705657&view=rev
Log:
QPID-1315:
BasicBytesFlowControl doesn't wait long enough to determine if the 3rd
message is going to be delivered accidently. It also ack'd every message, which
was not it's intent, so use acknowledgeThis() instead.
Refactor common code out of processQueue and flushSubscription into
attemptDelivery.
Make sure sendLock is held when closing the consumer.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=705657&r1=705656&r2=705657&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Fri Oct 17 09:09:51 2008
@@ -336,7 +336,14 @@
Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
if (sub != null)
{
- sub.getQueue().unregisterSubscription(sub);
+ try {
+ sub.getSendLock();
+ sub.getQueue().unregisterSubscription(sub);
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
return true;
}
else
@@ -395,7 +402,16 @@
Subscription sub = me.getValue();
- sub.getQueue().unregisterSubscription(sub);
+ try
+ {
+ sub.getSendLock();
+ sub.getQueue().unregisterSubscription(sub);
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+
}
_tag2SubscriptionMap.clear();
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=705657&r1=705656&r2=705657&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Fri Oct 17 09:09:51 2008
@@ -1174,7 +1174,7 @@
boolean complete = false;
try
{
- complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES);
+ complete = flushSubscription(_sub, new
Long(MAX_ASYNC_DELIVERIES));
}
catch (AMQException e)
@@ -1204,79 +1204,28 @@
flushSubscription(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long deliveries) throws
AMQException
+ public boolean flushSubscription(Subscription sub, Long deliveries) throws
AMQException
{
boolean atTail = false;
- boolean advanced;
while (!sub.isSuspended() && !atTail && deliveries != 0)
{
-
- advanced = false;
- sub.getSendLock();
- try
+ try
{
- if (sub.isActive())
+ sub.getSendLock();
+ atTail = attemptDelivery(sub, deliveries);
+ if (atTail && sub.isAutoClose())
{
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (!(node.isAcquired() || node.isDeleted()))
- {
- if (!sub.isSuspended())
- {
- if (sub.hasInterest(node))
- {
- if (!sub.wouldSuspend(node))
- {
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
-
- }
- else
- {
- deliveries--;
- deliverMessage(sub, node);
-
- if (sub.isBrowser())
- {
- QueueEntry newNode =
_entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node,
newNode);
- node = sub.getLastSeenEntry();
- }
- }
- }
-
- }
- else
- {
- break;
- }
- }
- else
- {
- // this subscription is not interested in this
node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
- }
-
- }
- atTail = (_entries.next(node) == null) && !advanced;
+ unregisterSubscription(sub);
+ ProtocolOutputConverter converter =
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(),
sub.getConsumerTag());
}
}
finally
{
sub.releaseSendLock();
}
-
}
// if there's (potentially) more than one subscription the others will
potentially not have been advanced to the
@@ -1287,16 +1236,72 @@
{
advanceAllSubscriptions();
}
+ return atTail;
+ }
- if (atTail && sub.isAutoClose())
+ private boolean attemptDelivery(Subscription sub, Long deliveries) throws
AMQException
+ {
+ boolean atTail = false;
+ boolean advanced = false;
+ boolean subActive = sub.isActive();
+ if (subActive)
{
- unregisterSubscription(sub);
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (!(node.isAcquired() || node.isDeleted()))
+ {
+ if (!sub.isSuspended())
+ {
+ if (sub.hasInterest(node))
+ {
+ if (!sub.wouldSuspend(node))
+ {
+ if (!sub.isBrowser() && !node.acquire(sub))
+ {
+ sub.restoreCredit(node);
- ProtocolOutputConverter converter =
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(),
sub.getConsumerTag());
- }
+ }
+ else
+ {
+ deliveries--;
+ deliverMessage(sub, node);
- return atTail;
+ if (sub.isBrowser())
+ {
+ QueueEntry newNode = _entries.next(node);
+
+ if (newNode != null)
+ {
+ advanced = true;
+ sub.setLastSeenEntry(node, newNode);
+ node = sub.getLastSeenEntry();
+ }
+ }
+ }
+
+ }
+ else // Not enough Credit for message and wouldSuspend
+ {
+ //QPID-1187 - Treat the subscription as suspended
for this message
+ // and wait for the message to be removed to
continue delivery.
+ subActive = false;
+ node.addStateChangeListener(new
QueueEntryListener(sub, node));
+ }
+ }
+ else
+ {
+ // this subscription is not interested in this node so
we can skip over it
+ QueueEntry newNode = _entries.next(node);
+ if (newNode != null)
+ {
+ sub.setLastSeenEntry(node, newNode);
+ }
+ }
+ }
+
+ }
+ atTail = (_entries.next(node) == null) && !advanced;
+ }
+ return atTail || !subActive;
}
protected void advanceAllSubscriptions() throws AMQException
@@ -1347,7 +1352,7 @@
boolean deliveryIncomplete = true;
int extraLoops = 1;
- int deliveries = MAX_ASYNC_DELIVERIES;
+ Long deliveries = new Long(MAX_ASYNC_DELIVERIES);
_asynchronousRunner.compareAndSet(runner, null);
@@ -1372,110 +1377,46 @@
{
boolean closeConsumer = false;
Subscription sub =
subscriptionIter.getNode().getSubscription();
- if (sub != null)
+ sub.getSendLock();
+ try
{
- sub.getSendLock();
- try
+ if (sub != null)
{
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null && sub.isActive())
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (node != null)
{
- boolean advanced = false;
- boolean subActive = false;
-
- if (!(node.isAcquired() || node.isDeleted()))
- {
- if (!sub.isSuspended())
- {
- subActive = true;
- if (sub.hasInterest(node))
- {
- if (!sub.wouldSuspend(node))
- {
- if (!sub.isBrowser() &&
!node.acquire(sub))
- {
- sub.restoreCredit(node);
-
- }
- else
- {
- deliverMessage(sub, node);
- deliveries--;
-
- if (sub.isBrowser())
- {
- QueueEntry newNode =
_entries.next(node);
-
- if (newNode != null)
- {
-
sub.setLastSeenEntry(node, newNode);
- node =
sub.getLastSeenEntry();
- advanced = true;
- }
-
- }
- }
- done = false;
- }
- else // Not enough Credit for message
and wouldSuspend
- {
- //QPID-1187 - Treat the
subscription as suspended for this message
- // and wait for the message to be
removed to continue delivery.
- subActive = false;
-
- node.addStateChangeListener(new
QueueEntryListener(sub, node));
- }
- }
- else
- {
- // this subscription is not interested
in this node so we can skip over it
- QueueEntry newNode =
_entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node,
newNode);
- }
- }
- }
- }
- final boolean atTail = (_entries.next(node) ==
null);
-
- done = done && (!subActive || atTail);
-
- closeConsumer = (atTail && !advanced &&
sub.isAutoClose());
+ done = attemptDelivery(sub, deliveries);
}
}
- finally
+ if (done)
{
- sub.releaseSendLock();
- }
-
- if (closeConsumer)
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter =
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(),
sub.getConsumerTag());
- }
+ if (extraLoops == 0)
+ {
+ deliveryIncomplete = false;
+ if (sub.isAutoClose())
+ {
+ unregisterSubscription(sub);
- }
- if (done)
- {
- if (extraLoops == 0)
- {
- deliveryIncomplete = false;
+ ProtocolOutputConverter converter =
sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(),
sub.getConsumerTag());
+ }
+ }
+ else
+ {
+ extraLoops--;
+ }
}
else
{
- extraLoops--;
+ extraLoops = 1;
}
}
- else
+ finally
{
- extraLoops = 1;
+ sub.releaseSendLock();
}
}
-
_asynchronousRunner.set(null);
}
Modified:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java?rev=705657&r1=705656&r2=705657&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
Fri Oct 17 09:09:51 2008
@@ -21,6 +21,7 @@
package org.apache.qpid.test.client;
import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.log4j.Logger;
@@ -91,25 +92,22 @@
assertNotNull("Second message not received", r2);
assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
- Message r3 = recv.receiveNoWait();
+ Message r3 = recv.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r1.acknowledge();
+ ((AbstractJMSMessage)r1).acknowledgeThis();
- r3 = recv.receiveNoWait();
+ r3 = recv.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r2.acknowledge();
+ ((AbstractJMSMessage)r2).acknowledgeThis();
r3 = recv.receive(RECEIVE_TIMEOUT);
assertNotNull("Third message not received", r3);
assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
- r3.acknowledge();
- recv.close();
- consumerSession.close();
+ ((AbstractJMSMessage)r3).acknowledgeThis();
consumerConnection.close();
-
}
public void testTwoConsumersBytesFlowControl() throws Exception
@@ -161,21 +159,21 @@
assertNotNull("First message not received", r1);
assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
- Message r2 = recv1.receiveNoWait();
+ Message r2 = recv1.receive(RECEIVE_TIMEOUT);
assertNull("Second message incorrectly delivered", r2);
Session consumerSession2 = consumerConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
((AMQSession_0_8) consumerSession2).setPrefetchLimits(0, 256);
MessageConsumer recv2 = consumerSession2.createConsumer(_queue);
- r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT);
+ r2 = recv2.receive(RECEIVE_TIMEOUT);
assertNotNull("Second message not received", r2);
assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
- Message r3 = recv2.receiveNoWait();
+ Message r3 = recv2.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT);
+ r3 = recv1.receive(RECEIVE_TIMEOUT);
assertNotNull("Third message not received", r3);
assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));