Author: ritchiem
Date: Fri Feb 16 01:06:47 2007
New Revision: 508351

URL: http://svn.apache.org/viewvc?view=rev&rev=508351
Log:
BasicConsumeMethodHandler - tidied up local channel/connection close frame 
writes by using the body.get[Channel|Connection]Exception() to throw a new 
exception to write out the frames.

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=508351&r1=508350&r2=508351
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 Fri Feb 16 01:06:47 2007
@@ -75,7 +75,7 @@
             if (queue == null)
             {
                 _log.info("No queue for '" + body.queue + "'");
-                if(body.queue!=null)
+                if (body.queue != null)
                 {
                     String msg = "No such queue, '" + body.queue + "'";
                     throw 
body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg);
@@ -83,7 +83,7 @@
                 else
                 {
                     String msg = "No queue name provided, no default queue 
defined.";
-                    throw 
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg );
+                    throw 
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg);
                 }
             }
             else
@@ -91,15 +91,15 @@
                 try
                 {
                     AMQShortString consumerTag = 
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
-                                                                  
body.arguments, body.noLocal, body.exclusive);
+                                                                          
body.arguments, body.noLocal, body.exclusive);
                     if (!body.nowait)
                     {
                         // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
                         // TODO: Connect this to the session version obtained 
from ProtocolInitiation for this session.
                         // Be aware of possible changes to parameter order as 
versions change.
                         
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
-                            (byte)8, (byte)0,  // AMQP version (major, minor)
-                            consumerTag));             // consumerTag
+                                                                             
(byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                             
consumerTag));        // consumerTag
                     }
 
                     //now allow queue to start async processing of any backlog 
of messages
@@ -108,43 +108,28 @@
                 catch (AMQInvalidSelectorException ise)
                 {
                     _log.info("Closing connection due to invalid selector");
-                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as 
versions change.
-                    
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
-                        (byte)8, (byte)0,      // AMQP version (major, minor)
-                        BasicConsumeBody.getClazz((byte)8, (byte)0),   // 
classId
-                        BasicConsumeBody.getMethod((byte)8, (byte)0),  // 
methodId
-                        AMQConstant.INVALID_SELECTOR.getCode(),        // 
replyCode
-                        new AMQShortString(ise.getMessage())));                
// replyText
+                    throw 
body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), 
ise.getMessage());
                 }
                 catch (ConsumerTagNotUniqueException e)
                 {
                     AMQShortString msg = new AMQShortString("Non-unique 
consumer tag, '" + body.consumerTag + "'");
-                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as 
versions change.
-                    
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
-                        (byte)8, (byte)0,      // AMQP version (major, minor)
-                        BasicConsumeBody.getClazz((byte)8, (byte)0),   // 
classId
-                        BasicConsumeBody.getMethod((byte)8, (byte)0),  // 
methodId
-                        AMQConstant.NOT_ALLOWED.getCode(),     // replyCode
-                        msg)); // replyText
+                    throw 
body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+                                                      "Non-unique consumer 
tag, '" + body.consumerTag + "'");
                 }
                 catch (AMQQueue.ExistingExclusiveSubscription e)
                 {
                     throw 
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
-                                                  "Cannot subscribe to queue "
-                                                          + queue.getName()
-                                                          + " as it already 
has an existing exclusive consumer");
+                                                   "Cannot subscribe to queue "
+                                                   + queue.getName()
+                                                   + " as it already has an 
existing exclusive consumer");
                 }
                 catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
-                                {
-                                    throw 
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
-                                                                  "Cannot 
subscribe to queue "
-                                                                          + 
queue.getName()
-                                                                          + " 
exclusively as it already has a consumer");
-                                }
+                {
+                    throw 
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+                                                   "Cannot subscribe to queue "
+                                                   + queue.getName()
+                                                   + " exclusively as it 
already has a consumer");
+                }
 
             }
         }


Reply via email to