Author: kpvdr
Date: Mon Jan 22 12:58:01 2007
New Revision: 498797

URL: http://svn.apache.org/viewvc?view=rev&rev=498797
Log:
Added session close convinience methods to broker ProtocolSession, modified 
handlers that need to close a session to use new methods. Added logger to 
RequestManager and ResponseManager.

Modified:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
 Mon Jan 22 12:58:01 2007
@@ -51,8 +51,8 @@
                                AMQMethodEvent<ChannelCloseBody> evt) throws 
AMQException
     {
         ChannelCloseBody body = evt.getMethod();
-        _logger.info("Received channel close for id " + evt.getChannelId() + " 
citing class " + body.classId +
-                     " and method " + body.methodId);
+        _logger.info("Received channel close for id " + evt.getChannelId() + " 
citing class " +
+            body.classId + " and method " + body.methodId);
         protocolSession.closeChannelResponse(evt.getChannelId(), 
evt.getRequestId());
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
 Mon Jan 22 12:58:01 2007
@@ -51,19 +51,8 @@
                                AMQMethodEvent<ConnectionCloseBody> evt) throws 
AMQException
     {
         final ConnectionCloseBody body = evt.getMethod();
-        _logger.info("ConnectionClose received with reply code/reply text " + 
body.replyCode + "/" +
-                     body.replyText +  " for " + protocolSession);
-        // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt, 
ConnectionCloseOkBody.createMethodBody(
-            protocolSession.getMajor(),  // AMQP major version
-            protocolSession.getMinor())); // AMQP minor version
-        try
-        {
-            protocolSession.closeSession();
-        }
-        catch (Exception e)
-        {
-            _logger.error("Error closing protocol session: " + e, e);
-        }
+        _logger.info("ConnectionClose received with reply code/reply text " + 
body.replyCode +
+            "/" + body.replyText +  " for " + protocolSession);
+        protocolSession.closeSessionResponse(evt.getRequestId());
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
 Mon Jan 22 12:58:01 2007
@@ -49,17 +49,7 @@
     public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ConnectionCloseOkBody> evt) 
throws AMQException
     {
-        //todo should this not do more than just log the method?
         _logger.info("Received Connection-close-ok");
-
-        try
-        {
-            
protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
-            protocolSession.closeSession();
-        }
-        catch (Exception e)
-        {
-            _logger.error("Error closing protocol session: " + e, e);
-        }
+        protocolSession.closeSession();
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
 Mon Jan 22 12:58:01 2007
@@ -76,16 +76,9 @@
                 // Can't do this as we violate protocol. Need to send Close
                 // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), 
AMQConstant.NOT_ALLOWED.getName());
                 _logger.info("Authentication failed");
-                stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                // Be aware of possible changes to parameter order as versions 
change.
-                AMQMethodBody close = ConnectionCloseBody.createMethodBody(
-                    major, minor,      // AMQP version (major, minor)
-                    ConnectionCloseBody.getClazz(major, minor),                
// classId
-                    ConnectionCloseBody.getMethod(major, minor),       // 
methodId
-                    AMQConstant.NOT_ALLOWED.getCode(), // replyCode
-                    AMQConstant.NOT_ALLOWED.getName());        // replyText
-                protocolSession.writeResponse(evt, close);
                 disposeSaslServer(protocolSession);
+                
protocolSession.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+                    AMQConstant.NOT_ALLOWED.getName(), body.getClazz(), 
body.getMethod());
                 break;
             case SUCCESS:
                 _logger.info("Connected as: " + ss.getAuthorizationID());

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
 Mon Jan 22 12:58:01 2007
@@ -23,20 +23,15 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidSelectorException;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.MessageConsumeBody;
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
 public class MessageConsumeHandler implements 
StateAwareMethodListener<MessageConsumeBody>
@@ -77,14 +72,11 @@
                 {
                     session.closeChannelRequest(evt.getChannelId(), 
AMQConstant.NOT_FOUND.getCode(),
                         "No such queue, '" + body.queue + "'");
-//                     channelClose(session, channelId, stateManager,
-//                                  "No such queue, '" + body.queue + "'", 
AMQConstant.NOT_FOUND);
                 }
                 else
                 {
-                    connectionClose(session, channelId, 
session.getStateManager(),
-                                    "No queue name provided, no default queue 
defined.",
-                                    AMQConstant.NOT_ALLOWED);
+                    
session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+                        "No queue name provided, no default queue defined.", 
body.getClazz(), body.getMethod());
                 }
             }
             else
@@ -103,54 +95,18 @@
                 }
                 catch (AMQInvalidSelectorException ise)
                 {
-                    _log.info("Closing connection due to invalid selector");
+                    _log.info("Closing connection due to invalid selector: " + 
ise.getMessage());
                     session.closeChannelRequest(evt.getChannelId(), 
AMQConstant.INVALID_SELECTOR.getCode(),
                         ise.getMessage());
-//                    channelClose(session, channelId, stateManager, 
ise.getMessage(), AMQConstant.INVALID_SELECTOR);
                 }
                 catch (ConsumerTagNotUniqueException e)
                 {
-                    connectionClose(session, channelId, 
session.getStateManager(),
-                                    "Non-unique consumer tag, '" + 
body.destination + "'",
-                                    AMQConstant.NOT_ALLOWED);
+                    _log.info("Closing connection due to duplicate 
(non-unique) consumer tag: " + e.getMessage());
+                    
session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+                        "Non-unique consumer tag, '" + body.destination + "'", 
body.getClazz(), body.getMethod());
                 }
             }
         }
     }
-
-//     private void channelClose(AMQProtocolSession session, int channelId, 
AMQMethodListener listener,
-//                               String message, AMQConstant code)
-//         throws AMQException
-//     {
-//         /*AMQShort*/String msg = new /*AMQShort*/String(message);
-//         // AMQP version change: Hardwire the version to 0-9 (major=0, 
minor=9)
-//         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-//         // Be aware of possible changes to parameter order as versions 
change.
-//         session.writeRequest(channelId, ChannelCloseBody.createMethodBody
-//                              ((byte)0, (byte)9,     // AMQP version (major, 
minor)
-//                               MessageConsumeBody.getClazz((byte)0, 
(byte)9),        // classId
-//                               MessageConsumeBody.getMethod((byte)0, 
(byte)9),       // methodId
-//                               code.getCode(),       // replyCode
-//                               msg), // replyText
-//                              listener);
-//     }
-
-    private void connectionClose(AMQProtocolSession session, int channelId, 
AMQMethodListener listener,
-                                 String message, AMQConstant code)
-        throws AMQException
-    {
-        byte major = session.getMajor();
-        byte minor = session.getMinor();
-        /*AMQShort*/String msg = new /*AMQShort*/String(message);
-        // Be aware of possible changes to parameter order as versions change.
-        session.writeRequest(channelId, ConnectionCloseBody.createMethodBody(
-                                major, minor,  // AMQP version (major, minor)
-                                MessageConsumeBody.getClazz(major, minor),     
// classId
-                                MessageConsumeBody.getMethod(major, minor),    
// methodId
-                                code.getCode(),        // replyCode
-                                msg),  // replyText
-                             listener);
-    }
-
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Mon Jan 22 12:58:01 2007
@@ -33,6 +33,9 @@
 import org.apache.qpid.framing.ConnectionOpenBody;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.Content;
 import org.apache.qpid.framing.FieldTable;
@@ -59,6 +62,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.AMQState;
 
 import javax.management.JMException;
 import javax.security.sasl.SaslServer;
@@ -100,6 +104,7 @@
 
     private Object _lastSent;
 
+    private boolean _closePending;
     private boolean _closed;
     // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
@@ -128,6 +133,8 @@
         _codecFactory = codecFactory;
         _managedObject = createMBean();
         _managedObject.register();
+        _closePending = false;
+        _closed = false;
     }
 
     public AMQMinaProtocolSession(IoSession session, QueueRegistry 
queueRegistry, ExchangeRegistry exchangeRegistry,
@@ -143,6 +150,8 @@
         _codecFactory = codecFactory;
         _managedObject = createMBean();
         _managedObject.register();
+        _closePending = false;
+        _closed = false;
     }
 
     private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -168,7 +177,8 @@
         return (AMQProtocolSession) minaProtocolSession.getAttachment();
     }
 
-    private AMQChannel createChannel(int id) throws AMQException {
+    private AMQChannel createChannel(int id) throws AMQException
+    {
         IApplicationRegistry registry = ApplicationRegistry.getInstance();
         AMQChannel channel = new AMQChannel(id, registry.getMessageStore(),
                                             _exchangeRegistry, this, 
_stateManager);
@@ -221,12 +231,22 @@
 
             }
         }
-        else
+        else if(!_closed)
         {
             AMQFrame frame = (AMQFrame) message;
-
             AMQChannel channel = getChannel(frame.channel);
-            if (channel == null)
+
+            if (_closePending)
+            {
+                // If a close is pending (ie ChannelClose has been sent, but 
no ChannelCloseOk received), then
+                // all methods except ChannelCloseOk must be rejected. (AMQP 
spec)
+                if((frame.bodyFrame instanceof AMQRequestBody))
+                    throw new AMQException("Incoming request frame on 
connection which is pending close.");
+                AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+                if (!(requestBody.getMethodPayload() instanceof 
ConnectionCloseOkBody))
+                    throw new AMQException("Incoming frame on unopened channel 
is not a Connection.Open method.");         
+            }
+            else if (channel == null)
             {
                 // Perform a check on incoming frames that may result in a new 
channel
                 // being opened. The frame MUST be:
@@ -235,12 +255,12 @@
                 // c. Must be a ConnectionOpenBody method.
                 // Throw an exception for all other incoming frames on an 
unopened channel
                 if(!(frame.bodyFrame instanceof AMQRequestBody))
-                    throw new AMQException("Incoming frame on unopened channel 
not a request");
+                    throw new AMQException("Incoming frame on unopened channel 
is not a request.");
                 AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
-                if (requestBody.getMethodPayload() instanceof 
ConnectionOpenBody)
-                    throw new AMQException("Incoming frame on unopened channel 
not a Connection.Open method");
+                if (!(requestBody.getMethodPayload() instanceof 
ChannelOpenBody))
+                    throw new AMQException("Incoming frame on unopened channel 
is not a Channel.Open method.");
                 if (requestBody.getRequestId() != 1)
-                    throw new AMQException("Incoming Connection.Open frame on 
unopened channel does not have a request id = 1");
+                    throw new AMQException("Incoming Channel.Open frame on 
unopened channel does not have a request id = 1.");
                 channel = createChannel(frame.channel);
             }
 
@@ -391,17 +411,36 @@
             channel.rollback();
         }
     }
+    
+    // Used to initiate a channel close from the server side and inform the 
client
+    public void closeChannelRequest(int channelId, int replyCode, String 
replyText) throws AMQException
+    {
+        final AMQChannel channel = _channelMap.get(channelId);
+        if (channel == null)
+        {
+            throw new IllegalArgumentException("Unknown channel id " + 
channelId);
+        }
+        else
+        {
+            channel.close(this);
+            // Be aware of possible changes to parameter order as versions 
change.
+            AMQMethodBody cf = ChannelCloseBody.createMethodBody
+                (_major, _minor,       // AMQP version (major, minor)
+                MessageTransferBody.getClazz((byte)0, (byte)9),        // 
classId
+                MessageTransferBody.getMethod((byte)0, (byte)9),       // 
methodId
+                replyCode,     // replyCode
+                replyText);    // replyText
+            writeRequest(channelId, cf);
+            // Wait a bit for the Channel.CloseOk to come in from the client, 
but don't
+            // rely on it. Attempt to remove the channel from the list if the 
ChannelCloseOk
+            // method handler has not already done so.
+            // TODO - Find a better way of doing this without holding up this 
thread...
+            try { Thread.currentThread().sleep(2000); } // 2 seconds
+            catch (InterruptedException e) {}
+            _channelMap.remove(channelId); // Returns null if already removed 
(by closeOk handler
+        }
+    }
 
-    /**
-     * Close a specific channel. This will remove any resources used by the 
channel, including:
-     * <ul><li>any queue subscriptions (this may in turn remove queues if they 
are auto delete</li>
-     * </ul>
-     *
-     * @param channelId id of the channel to close
-     * @param requestId RequestId of recieved Channel.Close reuqest, used to 
send Channel.CloseOk response
-     * @throws AMQException if an error occurs closing the channel
-     * @throws IllegalArgumentException if the channel id is not valid
-     */
     // Used to close a channel as a response to a client close request
     public void closeChannelResponse(int channelId, long requestId) throws 
AMQException
     {
@@ -425,33 +464,52 @@
             }
         }
     }
+
+    // Used to initiate a connection close from the server side and inform the 
client
+    public void closeSessionRequest(int replyCode, String replyText, int 
classId, int methodId) throws AMQException
+    {
+        _closePending = true; // This prevents all methods except Close-Ok 
from being accepted
+        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+        AMQMethodBody close = ConnectionCloseBody.createMethodBody(
+            _major, _minor,    // AMQP version (major, minor)
+            classId,           // classId
+            methodId,  // methodId
+            replyCode, // replyCode
+            replyText);        // replyText
+        writeRequest(0, close);        
+        // Wait a bit for the Connection.CloseOk to come in from the client, 
but don't
+        // rely on it. Attempt to close the connection if the ConnectionCloseOk
+        // method handler has not already done so.
+        // TODO - Find a better way of doing this without holding up this 
thread...
+        try { Thread.currentThread().sleep(2000); } // 2 seconds
+        catch (InterruptedException e) {}
+        closeSession();
+    }
     
-    // Used to close a channel from the server side and inform the client
-    public void closeChannelRequest(int channelId, int replyCode, String 
replyText) throws AMQException
+    public void closeSessionRequest(int replyCode, String replyText) throws 
AMQException
     {
-        final AMQChannel channel = _channelMap.get(channelId);
-        if (channel == null)
-        {
-            throw new IllegalArgumentException("Unknown channel id");
-        }
-        else
+        closeSessionRequest(replyCode, replyText, 0, 0);
+    }
+    
+    // Used to close a connection as a response to a client close request
+    public void closeSessionResponse(long requestId) throws AMQException
+    {
+        // Be aware of possible changes to parameter order as versions change.
+        writeResponse(0, requestId, 
ConnectionCloseOkBody.createMethodBody(_major, _minor)); // AMQP version
+        closeSession();
+    }
+    
+    public void closeSession() throws AMQException
+    {
+        if (!_closed)
         {
-            channel.close(this);
-            // Be aware of possible changes to parameter order as versions 
change.
-            AMQMethodBody cf = ChannelCloseBody.createMethodBody
-                (_major, _minor,       // AMQP version (major, minor)
-                MessageTransferBody.getClazz((byte)0, (byte)9),        // 
classId
-                MessageTransferBody.getMethod((byte)0, (byte)9),       // 
methodId
-                replyCode,     // replyCode
-                replyText);    // replyText
-            writeRequest(channelId, cf);
-            // Wait a bit for the Channel.CloseOk to come in from the client, 
but don't
-            // rely on it. Attempt to remove the channel from the list if the 
ChannelCloseOk
-            // method handler has not already done so.
-            // TODO - Find a better way of doing this without holding up this 
thread...
-            try { Thread.currentThread().sleep(2000); } // 2 seconds
-            catch (InterruptedException e) {}
-            _channelMap.remove(channelId); // Returns null if already removed
+            _closed = true;
+            closeAllChannels();
+            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+            if (_managedObject != null)
+            {
+                _managedObject.unregister();
+            }        
         }
     }
 
@@ -492,23 +550,6 @@
             channel.close(this);
         }
         _channelMap.clear();
-    }
-
-    /**
-     * This must be called when the session is _closed in order to free up any 
resources
-     * managed by the session.
-     */
-    public void closeSession() throws AMQException
-    {
-        if (!_closed)
-        {
-            _closed = true;
-            closeAllChannels();
-            if (_managedObject != null)
-            {
-                _managedObject.unregister();
-            }
-        }
     }
 
     public String toString()

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 Mon Jan 22 12:58:01 2007
@@ -174,18 +174,9 @@
         }
         else
         {
-            // AMQP version change: Hardwire the version to 0-9 (major=0, 
minor=9)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
-            AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody(
-               (byte)0, (byte)9,       // AMQP version (major, minor)
-               0,      // classId
-                0,     // methodId
-                200,   // replyCode
-                throwable.getMessage());       // replyText
-            session.writeRequest(0, closeBody, methodListener);
             _logger.error("Exception caught in" + session + ", closing session 
explictly: " + throwable, throwable);
-            protocolSession.close();
+            // TODO: Closing with code 200 ("reply-sucess") ??? This cannot be 
right!
+            session.closeSessionRequest(200, throwable.getMessage());
         }
     }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 Mon Jan 22 12:58:01 2007
@@ -72,19 +72,18 @@
      */
     void addChannel(AMQChannel channel) throws AMQException;
 
-    /**
-     * Close a specific channel. This will remove any resources used by the 
channel, including:
-     * <ul><li>any queue subscriptions (this may in turn remove queues if they 
are auto delete</li>
-     * </ul>
-     * @param channelId id of the channel to close
-     * @param requestId id of the request that initiated the close, used in 
response
-     * @throws org.apache.qpid.AMQException if an error occurs closing the 
channel
-     * @throws IllegalArgumentException if the channel id is not valid
-     */
+    void closeChannelRequest(int channelId, int replyCode, String replyText) 
throws AMQException;
+    
     void closeChannelResponse(int channelId, long requestId) throws 
AMQException;
     
-    void closeChannelRequest(int channelId, int replyCode, String replyText) 
throws AMQException;
+    void closeSessionRequest(int replyCode, String replyText, int classId, int 
methodId) throws AMQException;
 
+    void closeSessionRequest(int replyCode, String replyText) throws 
AMQException;
+    
+    void closeSessionResponse(long requestId) throws AMQException;
+    
+    void closeSession() throws AMQException;
+    
     /**
      * Remove a channel from the session but do not close it.
      * @param channelId
@@ -96,12 +95,6 @@
      * @param delay delay in seconds (not ms)
      */
     void initHeartbeats(int delay);
-
-    /**
-     * This must be called when the session is _closed in order to free up any 
resources
-     * managed by the session.
-     */
-    void closeSession() throws AMQException;
 
     /**
      * @return a key that uniquely identifies this session

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
 Mon Jan 22 12:58:01 2007
@@ -22,12 +22,16 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.log4j.Logger;
+
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.AMQProtocolWriter;
 
 public class RequestManager
 {
+    private static final Logger logger = 
Logger.getLogger(RequestManager.class);
+
     private int channel;
     private AMQProtocolWriter protocolWriter;
     
@@ -71,7 +75,11 @@
             lastProcessedResponseId, requestMethodBody);
         requestSentMap.put(requestId, methodListener);
         protocolWriter.writeFrame(requestFrame);
-        // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + 
channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + 
requestMethodBody);
+        if (logger.isDebugEnabled())
+        {
+            logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + 
channel +
+                " Req[" + requestId + " " + lastProcessedResponseId + "]; " + 
requestMethodBody);
+        }
         return requestId;
     }
 
@@ -80,7 +88,11 @@
     {
         long requestIdStart = responseBody.getRequestId();
         long requestIdStop = requestIdStart + responseBody.getBatchOffset();
-        // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + 
channel + " " + responseBody + "; " + responseBody.getMethodPayload());
+        if (logger.isDebugEnabled())
+        {
+            logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + 
channel +
+                " " + responseBody + "; " + responseBody.getMethodPayload());
+        }
         for (long requestId = requestIdStart; requestId <= requestIdStop; 
requestId++)
         {
             AMQMethodListener methodListener = requestSentMap.get(requestId);

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
 Mon Jan 22 12:58:01 2007
@@ -23,6 +23,8 @@
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -30,6 +32,8 @@
 
 public class ResponseManager
 {
+    private static final Logger logger = 
Logger.getLogger(ResponseManager.class);
+
     private int channel;
     private AMQMethodListener methodListener;
     private AMQProtocolWriter protocolWriter;
@@ -113,12 +117,15 @@
     public void requestReceived(AMQRequestBody requestBody) throws Exception
     {
         long requestId = requestBody.getRequestId();
-        // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + 
channel + " " + requestBody + "; " + requestBody.getMethodPayload());
+        if (logger.isDebugEnabled())
+        {
+            logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + 
channel +
+                " " + requestBody + "; " + requestBody.getMethodPayload());
+        }
         // TODO: responseMark is used in HA, but until then, ignore...
         long responseMark = requestBody.getResponseMark();
         lastReceivedRequestId = requestId;
         responseMap.put(requestId, new ResponseStatus(requestId));
-        // TODO: Update MethodEvent to use the RequestBody instead of 
MethodBody
         AMQMethodEvent methodEvent = new AMQMethodEvent(channel, 
requestBody.getMethodPayload(), requestId);
         methodListener.methodReceived(methodEvent);
     }
@@ -126,7 +133,11 @@
     public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
         throws RequestResponseMappingException
     {
-        // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + 
channel + " Res[# " + requestId + "]; " + responseMethodBody);
+        if (logger.isDebugEnabled())
+        {
+            logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + 
channel +
+                " Res[# " + requestId + "]; " + responseMethodBody);
+        }
         ResponseStatus responseStatus = responseMap.get(requestId);
         if (responseStatus == null)
             throw new RequestResponseMappingException(requestId,


Reply via email to