Author: rgreig
Date: Mon Jan 22 02:26:03 2007
New Revision: 498574

URL: http://svn.apache.org/viewvc?view=rev&rev=498574
Log:
QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos 
size / default exchanges and some other small fixes highlighted by the python 
tests

Added:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java

Added: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?view=auto&rev=498574
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
 Mon Jan 22 02:26:03 2007
@@ -0,0 +1,77 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class BasicGetMethodHandler implements 
StateAwareMethodListener<BasicGetBody>
+{
+    private static final Logger _log = 
Logger.getLogger(BasicGetMethodHandler.class);
+
+    private static final BasicGetMethodHandler _instance = new 
BasicGetMethodHandler();
+
+    public static BasicGetMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private BasicGetMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry 
queueRegistry,
+                               ExchangeRegistry exchangeRegistry, 
AMQProtocolSession session,
+                               AMQMethodEvent<BasicGetBody> evt) throws 
AMQException
+    {
+        BasicGetBody body = evt.getMethod();
+        final int channelId = evt.getChannelId();
+
+        AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            _log.error("Channel " + channelId + " not found");
+            // TODO: either alert or error that the
+        }
+        else
+        {
+            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : 
queueRegistry.getQueue(body.queue);
+
+            if (queue == null)
+            {
+                _log.info("No queue for '" + body.queue + "'");
+                if(body.queue!=null)
+                {
+                    throw 
body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+                                                      "No such queue, '" + 
body.queue + "'");
+                }
+                else
+                {
+                    throw 
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+                                                      "No queue name provided, 
no default queue defined.");
+                }
+            }
+            else
+            {
+                if(!queue.performGet(session, channel, !body.noAck))
+                {
+
+
+                    // TODO - set clusterId
+                    
session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 
0, null));
+                }
+            }
+        }
+    }
+}

Added: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=auto&rev=498574
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
 Mon Jan 22 02:26:03 2007
@@ -0,0 +1,81 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueuePurgeHandler implements 
StateAwareMethodListener<QueuePurgeBody>
+{
+    private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
+
+    public static QueuePurgeHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private final boolean _failIfNotFound;
+
+    public QueuePurgeHandler()
+    {
+        this(true);
+    }
+
+    public QueuePurgeHandler(boolean failIfNotFound)
+    {
+        _failIfNotFound = failIfNotFound;
+    }
+
+    public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+    {
+        QueuePurgeBody body = evt.getMethod();
+        AMQQueue queue;
+        if(body.queue == null)
+        {
+            queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+            if(queue == null)
+            {
+                if(_failIfNotFound)
+                {
+                    throw 
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue 
specified.");
+                }
+
+            }
+        }
+        else
+        {
+            queue = queues.getQueue(body.queue);
+        }
+
+        if(queue == null)
+        {
+            if(_failIfNotFound)
+            {
+                throw body.getChannelException(404, "Queue " + body.queue + " 
does not exist.");
+            }
+        }
+        else
+        {
+                long purged = 
queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+
+
+                if(!body.nowait)
+                {
+                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as 
versions change.
+                    
session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(),
+                        (byte)8, (byte)0,      // AMQP version (major, minor)
+                        purged));      // messageCount
+                }
+        }
+    }
+}


Reply via email to