Author: ritchiem
Date: Fri Feb 16 01:06:47 2007
New Revision: 508351
URL: http://svn.apache.org/viewvc?view=rev&rev=508351
Log:
BasicConsumeMethodHandler - tidied up local channel/connection close frame
writes by using the body.get[Channel|Connection]Exception() to throw a new
exception to write out the frames.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=508351&r1=508350&r2=508351
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Fri Feb 16 01:06:47 2007
@@ -75,7 +75,7 @@
if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
- if(body.queue!=null)
+ if (body.queue != null)
{
String msg = "No such queue, '" + body.queue + "'";
throw
body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg);
@@ -83,7 +83,7 @@
else
{
String msg = "No queue name provided, no default queue
defined.";
- throw
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg );
+ throw
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg);
}
}
else
@@ -91,15 +91,15 @@
try
{
AMQShortString consumerTag =
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
-
body.arguments, body.noLocal, body.exclusive);
+
body.arguments, body.noLocal, body.exclusive);
if (!body.nowait)
{
// AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
// TODO: Connect this to the session version obtained
from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as
versions change.
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag)); // consumerTag
+
(byte) 8, (byte) 0, // AMQP version (major, minor)
+
consumerTag)); // consumerTag
}
//now allow queue to start async processing of any backlog
of messages
@@ -108,43 +108,28 @@
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as
versions change.
-
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
- AMQConstant.INVALID_SELECTOR.getCode(), //
replyCode
- new AMQShortString(ise.getMessage())));
// replyText
+ throw
body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(),
ise.getMessage());
}
catch (ConsumerTagNotUniqueException e)
{
AMQShortString msg = new AMQShortString("Non-unique
consumer tag, '" + body.consumerTag + "'");
- // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as
versions change.
-
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+ throw
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ "Non-unique consumer
tag, '" + body.consumerTag + "'");
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
throw
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
- "Cannot subscribe to queue "
- + queue.getName()
- + " as it already
has an existing exclusive consumer");
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an
existing exclusive consumer");
}
catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
- {
- throw
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
- "Cannot
subscribe to queue "
- +
queue.getName()
- + "
exclusively as it already has a consumer");
- }
+ {
+ throw
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it
already has a consumer");
+ }
}
}