Author: rgreig
Date: Mon Jan 22 02:26:03 2007
New Revision: 498574
URL: http://svn.apache.org/viewvc?view=rev&rev=498574
Log:
QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos
size / default exchanges and some other small fixes highlighted by the python
tests
Added:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
Added:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?view=auto&rev=498574
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
(added)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
Mon Jan 22 02:26:03 2007
@@ -0,0 +1,77 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class BasicGetMethodHandler implements
StateAwareMethodListener<BasicGetBody>
+{
+ private static final Logger _log =
Logger.getLogger(BasicGetMethodHandler.class);
+
+ private static final BasicGetMethodHandler _instance = new
BasicGetMethodHandler();
+
+ public static BasicGetMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicGetMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry
queueRegistry,
+ ExchangeRegistry exchangeRegistry,
AMQProtocolSession session,
+ AMQMethodEvent<BasicGetBody> evt) throws
AMQException
+ {
+ BasicGetBody body = evt.getMethod();
+ final int channelId = evt.getChannelId();
+
+ AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ _log.error("Channel " + channelId + " not found");
+ // TODO: either alert or error that the
+ }
+ else
+ {
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() :
queueRegistry.getQueue(body.queue);
+
+ if (queue == null)
+ {
+ _log.info("No queue for '" + body.queue + "'");
+ if(body.queue!=null)
+ {
+ throw
body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+ "No such queue, '" +
body.queue + "'");
+ }
+ else
+ {
+ throw
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ "No queue name provided,
no default queue defined.");
+ }
+ }
+ else
+ {
+ if(!queue.performGet(session, channel, !body.noAck))
+ {
+
+
+ // TODO - set clusterId
+
session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte)
0, null));
+ }
+ }
+ }
+ }
+}
Added:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=auto&rev=498574
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
(added)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
Mon Jan 22 02:26:03 2007
@@ -0,0 +1,81 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueuePurgeHandler implements
StateAwareMethodListener<QueuePurgeBody>
+{
+ private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
+
+ public static QueuePurgeHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private final boolean _failIfNotFound;
+
+ public QueuePurgeHandler()
+ {
+ this(true);
+ }
+
+ public QueuePurgeHandler(boolean failIfNotFound)
+ {
+ _failIfNotFound = failIfNotFound;
+ }
+
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues,
ExchangeRegistry exchanges, AMQProtocolSession session,
AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ {
+ QueuePurgeBody body = evt.getMethod();
+ AMQQueue queue;
+ if(body.queue == null)
+ {
+ queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue
specified.");
+ }
+
+ }
+ }
+ else
+ {
+ queue = queues.getQueue(body.queue);
+ }
+
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw body.getChannelException(404, "Queue " + body.queue + "
does not exist.");
+ }
+ }
+ else
+ {
+ long purged =
queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+
+
+ if(!body.nowait)
+ {
+ // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as
versions change.
+
session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ purged)); // messageCount
+ }
+ }
+ }
+}