Author: ritchiem
Date: Fri Mar 14 04:36:42 2008
New Revision: 637066
URL: http://svn.apache.org/viewvc?rev=637066&view=rev
Log:
QPID-852 : Updated broker so that it closes consumers when there are no
messages on the queue.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Fri Mar 14 04:36:42 2008
@@ -700,6 +700,8 @@
{
_subscribers.setExclusive(true);
}
+
+ subscription.start();
}
private boolean isExclusive()
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
Fri Mar 14 04:36:42 2008
@@ -45,8 +45,6 @@
void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
- boolean isAutoClose();
-
void close();
boolean isClosed();
@@ -60,4 +58,6 @@
Object getSendLock();
AMQChannel getChannel();
+
+ void start();
}
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Fri Mar 14 04:36:42 2008
@@ -461,7 +461,7 @@
}
}
- public boolean isAutoClose()
+ private boolean isAutoClose()
{
return _autoClose;
}
@@ -523,19 +523,24 @@
{
_logger.info("Closing autoclose subscription (" + debugIdentity()
+ "):" + this);
- ProtocolOutputConverter converter =
protocolSession.getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(channel.getChannelId(),
consumerTag);
- _sentClose = true;
-
- //fixme JIRA do this better
+ boolean unregisteredOK = false;
try
{
- channel.unsubscribeConsumer(protocolSession, consumerTag);
+ unregisteredOK = channel.unsubscribeConsumer(protocolSession,
consumerTag);
}
catch (AMQException e)
{
// Occurs if we cannot find the subscriber in the channel with
protocolSession and consumerTag.
+ _logger.info("Unable to UnsubscribeConsumer :" + consumerTag
+" so not going to send CancelOK.");
}
+
+ if (unregisteredOK)
+ {
+ ProtocolOutputConverter converter =
protocolSession.getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(channel.getChannelId(),
consumerTag);
+ _sentClose = true;
+ }
+
}
}
@@ -664,6 +669,21 @@
public AMQChannel getChannel()
{
return channel;
+ }
+
+ public void start()
+ {
+ //Check to see if we need to autoclose
+ if (filtersMessages())
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ autoclose();
+ }
+ }
+ }
}
}