Author: gsim
Date: Mon Jan 29 04:13:04 2007
New Revision: 501021

URL: http://svn.apache.org/viewvc?view=rev&rev=501021
Log:
Fixes to get the python queue tests to work.
(NB: currently, auto-delete is not in so tests that re-use the same exclusive 
queue conflict with each other)


Added:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
   (with props)
Modified:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
    
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Mon Jan 29 04:13:04 2007
@@ -375,7 +375,7 @@
      * @throws AMQException                  if something goes wrong
      */
     public String subscribeToQueue(String tag, AMQQueue queue, 
AMQProtocolSession session, boolean acks,
-                                   FieldTable filters, boolean noLocal) throws 
AMQException, ConsumerTagNotUniqueException
+                                   FieldTable filters, boolean noLocal, 
boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -386,7 +386,7 @@
             throw new ConsumerTagNotUniqueException();
         }
 
-        queue.registerProtocolSession(session, _channelId, tag, acks, filters, 
noLocal);
+        queue.registerProtocolSession(session, _channelId, tag, acks, filters, 
noLocal, exclusive);
         _consumerTag2QueueMap.put(tag, queue);
         return tag;
     }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
 Mon Jan 29 04:13:04 2007
@@ -54,20 +54,18 @@
                                AMQMethodEvent<ConnectionOpenBody> evt) throws 
AMQException
     {
         ConnectionOpenBody body = evt.getMethod();
-        String contextKey = body.virtualHost;
 
         //todo //FIXME The virtual host must be validated by the server for 
the connection to open-ok
         // See Spec (0.8.2). Section  3.1.2 Virtual Hosts
-        if (contextKey == null)
+        if (protocolSession.getContextKey() == null)
         {
-            contextKey = generateClientID();
+            protocolSession.setContextKey(generateClientID());
         }
-        protocolSession.setContextKey(contextKey);
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody response = ConnectionOpenOkBody.createMethodBody(
             protocolSession.getMajor(), // AMQP major version
             protocolSession.getMinor(), // AMQP minor version
-            contextKey);       // knownHosts
+            body.virtualHost); // knownHosts
         
protocolSession.getStateManager().changeState(AMQState.CONNECTION_OPEN);
         protocolSession.writeResponse(evt, response);
     }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
 Mon Jan 29 04:13:04 2007
@@ -84,7 +84,7 @@
                 try
                 {
                     /*AMQShort*/String destination = channel.subscribeToQueue
-                        (body.destination, queue, session, !body.noAck, 
/*XXX*/null, body.noLocal);
+                        (body.destination, queue, session, !body.noAck, 
/*XXX*/null, body.noLocal, body.exclusive);
                     // Be aware of possible changes to parameter order as 
versions change.
                     session.writeResponse(evt, MessageOkBody.createMethodBody(
                         session.getMajor(), // AMQP major version

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
 Mon Jan 29 04:13:04 2007
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.exchange.Exchange;
@@ -77,26 +78,45 @@
         }
         //TODO: do we need to check that the queue already exists with exactly 
the same "configuration"?
 
+        AMQQueue queue = null;
         QueueRegistry queueRegistry = protocolSession.getQueueRegistry();
         synchronized (queueRegistry)
         {
-            AMQQueue queue;
             if ((queue = queueRegistry.getQueue(body.queue)) == null)
             {
-                queue = createQueue(body, queueRegistry, protocolSession);
-                if (queue.isDurable() && !queue.isAutoDelete())
+                if(body.passive)
                 {
-                    _store.createQueue(queue);
+                    String msg = "Queue: " + body.queue + " not found.";
+                    throw 
body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg );
+
                 }
-                queueRegistry.registerQueue(queue);
-                if (autoRegister)
+                else
                 {
-                    Exchange defaultExchange = 
protocolSession.getExchangeRegistry().getExchange("amq.direct");
-                    defaultExchange.registerQueue(body.queue, queue, null);
-                    queue.bind(body.queue, defaultExchange);
-                    _log.info("Queue " + body.queue + " bound to default 
exchange");
+                    queue = createQueue(body, queueRegistry, protocolSession);
+                    if (queue.isDurable() && !queue.isAutoDelete())
+                    {
+                        _store.createQueue(queue);
+                    }
+                    queueRegistry.registerQueue(queue);
+                    if (autoRegister)
+                    {
+                        Exchange defaultExchange = 
protocolSession.getExchangeRegistry().getExchange("amq.direct");
+                        defaultExchange.registerQueue(body.queue, queue, null);
+                        queue.bind(body.queue, defaultExchange);
+                        _log.info("Queue " + body.queue + " bound to default 
exchange");
+                    }
                 }
             }
+            else if(queue.getOwner() != null && 
!protocolSession.getContextKey().equals(queue.getOwner()))
+            {
+                // todo - constant
+                throw body.getChannelException(405, "Cannot declare queue, as 
exclusive queue with same name declared on another connection");        
+
+            }
+            else
+            {
+                _log.info("Queue " + body.queue + " exists and is accesible to 
this connection [owner=" + queue.getOwner() +"]");
+            }
             //set this as the default queue on the channel:
             
protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
         }
@@ -106,8 +126,8 @@
             AMQMethodBody response = QueueDeclareOkBody.createMethodBody(
                 protocolSession.getMajor(), // AMQP major version
                 protocolSession.getMinor(), // AMQP minor version
-                0L, // consumerCount
-                0L, // messageCount
+                queue.getConsumerCount(), // consumerCount
+                queue.getMessageCount(), // messageCount
                 body.queue); // queue
             _log.info("Queue " + body.queue + " declared successfully");
             protocolSession.writeResponse(evt, response);
@@ -128,6 +148,7 @@
             throws AMQException
     {
         String owner = body.exclusive ? session.getContextKey() : null;
+        if (owner != null) _log.info("Queue " + body.queue + " is owned by " + 
owner);
         return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, 
registry);
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
 Mon Jan 29 04:13:04 2007
@@ -79,13 +79,26 @@
         }
         else
         {
-            int purged = queue.delete(body.ifUnused, body.ifEmpty);
-            _store.removeQueue(queue.getName());
-            // Be aware of possible changes to parameter order as versions 
change.
-            session.writeResponse(evt, QueueDeleteOkBody.createMethodBody(
-                session.getMajor(), // AMQP major version
-                session.getMinor(), // AMQP minor version
-                purged));      // messageCount
+            if(body.ifEmpty && !queue.isEmpty())
+            {
+                throw body.getChannelException(406, "Queue: " + body.queue + " 
is not empty." );
+            }
+            else if(body.ifUnused && !queue.isUnused())
+            {                
+                // TODO - Error code
+                throw body.getChannelException(406, "Queue: " + body.queue + " 
is still used." );
+
+            }
+            else
+            {
+                int purged = queue.delete(body.ifUnused, body.ifEmpty);
+                _store.removeQueue(queue.getName());
+                // Be aware of possible changes to parameter order as versions 
change.
+                session.writeResponse(evt, QueueDeleteOkBody.createMethodBody(
+                                                session.getMajor(), // AMQP 
major version
+                                                session.getMinor(), // AMQP 
minor version
+                                                purged));      // messageCount
+            }
         }
     }
 }

Added: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=auto&rev=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
 (added)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
 Mon Jan 29 04:13:04 2007
@@ -0,0 +1,81 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueuePurgeHandler implements 
StateAwareMethodListener<QueuePurgeBody>
+{
+    private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
+
+    public static QueuePurgeHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private final boolean _failIfNotFound;
+
+    public QueuePurgeHandler()
+    {
+        this(true);
+    }
+
+    public QueuePurgeHandler(boolean failIfNotFound)
+    {
+        _failIfNotFound = failIfNotFound;
+    }
+
+    public void methodReceived(AMQProtocolSession session, 
AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+    {
+        QueueRegistry queueRegistry = session.getQueueRegistry();
+
+        QueuePurgeBody body = evt.getMethod();
+        AMQQueue queue;
+        if(body.queue == null)
+        {
+            queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+            if(queue == null)
+            {
+                if(_failIfNotFound)
+                {
+                    throw 
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue 
specified.");
+                }
+
+            }
+        }
+        else
+        {
+            queue = queueRegistry.getQueue(body.queue);
+        }
+
+        if(queue == null)
+        {
+            if(_failIfNotFound)
+            {
+                throw body.getChannelException(404, "Queue " + body.queue + " 
does not exist.");
+            }
+        }
+        else
+        {
+            long purged = queue.clearQueue();
+            
+            
+            if(!body.nowait)
+            {
+                AMQMethodBody response 
+                    = QueuePurgeOkBody.createMethodBody(session.getMajor(), 
session.getMinor(), purged);
+                session.writeResponse(evt, response);
+
+            }
+        }
+    }
+}

Propchange: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Mon Jan 29 04:13:04 2007
@@ -49,6 +49,7 @@
 import org.apache.qpid.framing.ResponseManager;
 import org.apache.qpid.framing.RequestResponseMappingException;
 import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -82,6 +83,8 @@
 {
     private static final Logger _logger = 
Logger.getLogger(AMQProtocolSession.class);
 
+    private static final String CLIENT_PROPERTIES_INSTANCE = 
ClientProperties.instance.toString();
+
     private final IoSession _minaProtocolSession;
 
     private String _contextKey;
@@ -666,6 +669,10 @@
     public void setClientProperties(FieldTable clientProperties)
     {
         _clientProperties = clientProperties;
+        if((_clientProperties != null) && 
(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
+        {
+            
setContextKey(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE));
+        }
     }
     
     public QueueRegistry getQueueRegistry()

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Mon Jan 29 04:13:04 2007
@@ -34,6 +34,8 @@
 import java.text.MessageFormat;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This is an AMQ Queue, and should not be confused with a JMS queue or any 
other abstraction like
@@ -41,6 +43,28 @@
  */
 public class AMQQueue implements Managable, Comparable
 {
+    public static final class ExistingExclusiveSubscription extends 
AMQException
+    {
+
+        public ExistingExclusiveSubscription()
+        {
+            super("");
+        }
+    }
+
+    public static final class ExistingSubscriptionPreventsExclusive extends 
AMQException
+    {
+
+        public ExistingSubscriptionPreventsExclusive()
+        {
+            super("");
+        }
+    }
+
+    private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = 
new ExistingExclusiveSubscription();
+    private static final ExistingSubscriptionPreventsExclusive 
EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
+
+
     private static final Logger _logger = Logger.getLogger(AMQQueue.class);
 
     private final String _name;
@@ -64,6 +88,10 @@
 
     private final SubscriptionFactory _subscriptionFactory;
 
+    private final AtomicInteger _subscriberCount = new AtomicInteger();
+
+    private final AtomicBoolean _isExclusive = new AtomicBoolean();
+
     /**
      * Manages message delivery.
      */
@@ -352,9 +380,9 @@
     /**
      * removes all the messages from the queue.
      */
-    public void clearQueue() throws AMQException
+    public long clearQueue() throws AMQException
     {
-        _deliveryMgr.clearAllMessages();
+        return _deliveryMgr.clearAllMessages();
     }
 
     public void bind(String routingKey, Exchange exchange)
@@ -362,14 +390,29 @@
         _bindings.addBinding(routingKey, exchange);
     }
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag, boolean acks, FieldTable filters) throws AMQException
-    {
-        registerProtocolSession(ps, channel, consumerTag, acks, filters, 
false);
-    }
-
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag, boolean acks,
+                                        FieldTable filters, boolean noLocal, 
boolean exclusive)
             throws AMQException
     {
+        if(incrementSubscriberCount() > 1)
+        {
+            if(isExclusive())
+            {
+                decrementSubscriberCount();
+                throw EXISTING_EXCLUSIVE;
+            }
+            else if(exclusive)
+            {
+                decrementSubscriberCount();
+                throw EXISTING_SUBSCRIPTION;
+            }
+
+        }
+        else if(exclusive)
+        {
+            setExclusive(true);
+        }
+
         debug("Registering protocol session {0} with channel {1} and consumer 
tag {2} with {3}", ps, channel, consumerTag, this);
 
         Subscription subscription = 
_subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, 
filters, noLocal);
@@ -378,13 +421,33 @@
         {
             if (_deliveryMgr.hasQueuedMessages())
             {
-                _deliveryMgr.populatePreDeliveryQueue(subscription);   
+                _deliveryMgr.populatePreDeliveryQueue(subscription);
             }
         }
 
         _subscribers.addSubscriber(subscription);
     }
 
+    private boolean isExclusive()
+    {
+        return _isExclusive.get();
+    }
+
+    private void setExclusive(boolean exclusive)
+    {
+        _isExclusive.set(exclusive);
+    }
+
+    private int incrementSubscriberCount()
+    {
+        return _subscriberCount.incrementAndGet();
+    }
+
+    private int decrementSubscriberCount()
+    {
+        return _subscriberCount.decrementAndGet();
+    }
+
     public void unregisterProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag) throws AMQException
     {
         debug("Unregistering protocol session {0} with channel {1} and 
consumer tag {2} from {3}", ps, channel, consumerTag,
@@ -400,6 +463,10 @@
                                    " and protocol session key " + ps.getKey() 
+ " not registered with queue " + this);
         }
 
+        setExclusive(false);
+        decrementSubscriberCount();
+
+
         // if we are eligible for auto deletion, unregister from the queue 
registry
         if (_autoDelete && _subscribers.isEmpty())
         {
@@ -409,6 +476,17 @@
             removedSubscription.queueDeleted(this);
         }
     }
+
+    public boolean isUnused()
+    {
+        return _subscribers.isEmpty();
+    }
+
+    public boolean isEmpty()
+    {
+        return !_deliveryMgr.hasQueuedMessages();
+    }
+
 
     public int delete(boolean checkUnused, boolean checkEmpty) throws 
AMQException
     {

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
 Mon Jan 29 04:13:04 2007
@@ -217,7 +217,7 @@
         }
     }
 
-    public synchronized void clearAllMessages() throws AMQException
+    public synchronized long clearAllMessages() throws AMQException
     {
         AMQMessage msg = poll();
         while (msg != null)
@@ -225,6 +225,7 @@
             msg.dequeue(_queue);
             msg = poll();
         }
+        return 0;
     }
 
     /**

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Mon Jan 29 04:13:04 2007
@@ -176,14 +176,17 @@
         }
     }
 
-    public synchronized void clearAllMessages() throws AMQException
+    public synchronized long clearAllMessages() throws AMQException
     {
+        long count = 0;
         AMQMessage msg = poll();
         while (msg != null)
         {
             msg.dequeue(_queue);
+            count++;
             msg = poll();
         }
+        return count;
     }
 
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 Mon Jan 29 04:13:04 2007
@@ -70,7 +70,7 @@
 
     void removeAMessageFromTop() throws AMQException;
 
-    void clearAllMessages() throws AMQException;
+    long clearAllMessages() throws AMQException;
 
     List<AMQMessage> getMessages();
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
 Mon Jan 29 04:13:04 2007
@@ -136,7 +136,7 @@
         }
     }
 
-    public synchronized void clearAllMessages() throws AMQException
+    public synchronized long clearAllMessages() throws AMQException
     {
         AMQMessage msg = poll();
         while (msg != null)
@@ -144,6 +144,7 @@
             msg.dequeue(_queue);
             msg = poll();
         }
+        return 0;
     }
 
     /**

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
 Mon Jan 29 04:13:04 2007
@@ -134,6 +134,7 @@
         frame2handlerMap.put(QueueBindBody.class, 
QueueBindHandler.getInstance());
         frame2handlerMap.put(QueueDeclareBody.class, 
QueueDeclareHandler.getInstance());
         frame2handlerMap.put(QueueDeleteBody.class, 
QueueDeleteHandler.getInstance());
+        frame2handlerMap.put(QueuePurgeBody.class, 
QueuePurgeHandler.getInstance());
         frame2handlerMap.put(ChannelFlowBody.class, 
ChannelFlowHandler.getInstance());
         frame2handlerMap.put(TxSelectBody.class, 
TxSelectHandler.getInstance());
         frame2handlerMap.put(TxCommitBody.class, 
TxCommitHandler.getInstance());

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
 Mon Jan 29 04:13:04 2007
@@ -92,7 +92,7 @@
 
     public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, 
"resource error", true);
 
-    public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not 
allowed", true);
+    public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not 
allowed", true);
 
     public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, 
"not implemented", true);
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=501021&r1=501020&r2=501021
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Mon Jan 29 04:13:04 2007
@@ -70,7 +70,7 @@
         _channel = new AMQChannel(1,_protocolSession, _messageStore, 
null,null);
         _protocolSession.addChannel(_channel);
         
-        _queue.registerProtocolSession(_protocolSession, 1, "test", false, 
null);
+        _queue.registerProtocolSession(_protocolSession, 1, "test", false, 
null, false, false);
         assertTrue(_queueMBean.getActiveConsumerCount() == 1);
 
         SubscriptionSet _subscribers = (SubscriptionSet) mgr;


Reply via email to