Author: kpvdr
Date: Mon Jan 15 11:59:11 2007
New Revision: 496456

URL: http://svn.apache.org/viewvc?view=rev&rev=496456
Log:
Added a request/response id to the MethodEvent class that is used to dispatch 
incoming messages to the handlers. Corrected some compile errors.

Modified:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496456&r1=496455&r2=496456
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Mon Jan 15 11:59:11 2007
@@ -38,6 +38,8 @@
 import org.apache.qpid.framing.AMQResponseBody;
 import org.apache.qpid.framing.AMQResponseCallback;
 import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.RequestManager;
+import org.apache.qpid.framing.ResponseManager;
 import org.apache.qpid.framing.RequestResponseMappingException;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
@@ -64,6 +66,7 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 
 public class AMQMinaProtocolSession implements AMQProtocolSession,
+                                               AMQResponseCallback,
                                                ProtocolVersionList,
                                                Managable
 {
@@ -179,14 +182,14 @@
                 String mechanisms = 
ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
                 String locales = "en_US";
                 // Interfacing with generated code - be aware of possible 
changes to parameter order as versions change.
-                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 
0,
+                AMQMethodBody connectionStartBody = 
ConnectionStartBody.createMethodBody(
                    (byte)_major, (byte)_minor, // AMQP version (major, minor)
                     locales.getBytes(),        // locales
                     mechanisms.getBytes(),     // mechanisms
                     null,      // serverProperties
                     (short)_major,     // versionMajor
                     (short)_minor);    // versionMinor
-                _minaProtocolSession.write(response);
+                writeRequest(0, connectionStartBody, this);
             }
             catch (AMQException e)
             {
@@ -223,42 +226,47 @@
         }
     }
     
-    private void requestFrameReceived(int channel, AMQRequestBody requestBody) 
throws AMQException
+       public void responseFrameReceived(AMQResponseBody responseBody)
+    {
+        // do nothing
+    }
+    
+    private void requestFrameReceived(int channelNum, AMQRequestBody 
requestBody) throws AMQException
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Request frame received: " + frame);
+            _logger.debug("Request frame received: " + requestBody);
         }
-        AMQChannel channel = getChannel(channel);
+        AMQChannel channel = getChannel(channelNum);
         ResponseManager responseManager = channel.getResponseManager();
         responseManager.requestReceived(requestBody);
     }
     
-    private void responseFrameReceived(int channel, AMQResponseBody 
responseBody) throws AMQException
+    private void responseFrameReceived(int channelNum, AMQResponseBody 
responseBody) throws AMQException
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Response frame received: " + frame);
+            _logger.debug("Response frame received: " + responseBody);
         }
-        AMQChannel channel = getChannel(channel);
+        AMQChannel channel = getChannel(channelNum);
         RequestManager requestManager = channel.getRequestManager();
         requestManager.responseReceived(responseBody);
     }
 
-    public long writeRequest(int channel, AMQMethodBody methodBody, 
AMQResponseCallback responseCallback)
+    public long writeRequest(int channelNum, AMQMethodBody methodBody, 
AMQResponseCallback responseCallback)
         throws RequestResponseMappingException
     {
-        AMQChannel channel = getChannel(channel);
+        AMQChannel channel = getChannel(channelNum);
         RequestManager requestManager = channel.getRequestManager();
         return requestManager.sendRequest(methodBody, responseCallback);
     }
 
-    public void writeResponse(int channel, long requestId, AMQMethodBody 
methodBody)
+    public void writeResponse(int channelNum, long requestId, AMQMethodBody 
methodBody)
         throws RequestResponseMappingException
     {
-        AMQChannel channel = getChannel(channel);
+        AMQChannel channel = getChannel(channelNum);
         ResponseManager responseManager = channel.getResponseManager();
-        responseManager(requestId, methodBody);
+        responseManager.sendResponse(requestId, methodBody);
     }
 
     /**

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=496456&r1=496455&r2=496456
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 Mon Jan 15 11:59:11 2007
@@ -49,7 +49,7 @@
  * the state for the connection.
  *
  */
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements 
ProtocolVersionList
+public class AMQPFastProtocolHandler extends IoHandlerAdapter implements 
ProtocolVersionList, AMQResponseCallback
 {
     private static final Logger _logger = 
Logger.getLogger(AMQPFastProtocolHandler.class);
 
@@ -175,16 +175,21 @@
             // AMQP version change: Hardwire the version to 0-9 (major=0, 
minor=9)
             // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions 
change.
-            protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+            AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody(
                (byte)0, (byte)9,       // AMQP version (major, minor)
                0,      // classId
                 0,     // methodId
                 200,   // replyCode
-                throwable.getMessage() // replyText
-                ));
+                throwable.getMessage());       // replyText
+            protocolSession.writeRequest(0, closeBody, this);
             _logger.error("Exception caught in" + session + ", closing session 
explictly: " + throwable, throwable);
             protocolSession.close();
         }
+    }
+    
+       public void responseFrameReceived(AMQResponseBody responseBody)
+    {
+        // do nothing
     }
 
     /**

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=496456&r1=496455&r2=496456
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
 Mon Jan 15 11:59:11 2007
@@ -111,7 +111,7 @@
         lastReceivedRequestId = requestId;
         responseMap.put(requestId, new ResponseStatus(requestId));
         // TODO: Update MethodEvent to use the RequestBody instead of 
MethodBody
-        AMQMethodEvent methodEvent = new AMQMethodEvent(channel, 
requestBody.getMethodPayload());
+        AMQMethodEvent methodEvent = new AMQMethodEvent(channel, 
requestBody.getMethodPayload(), requestId);
         methodListener.methodReceived(methodEvent);
     }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java?view=diff&rev=496456&r1=496455&r2=496456
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
 Mon Jan 15 11:59:11 2007
@@ -36,13 +36,14 @@
 public class AMQMethodEvent<M extends AMQMethodBody>
 {
     private final M _method;
-
     private final int _channelId;
+    private final long _requestResponseId;
 
-    public AMQMethodEvent(int channelId, M method)
+    public AMQMethodEvent(int channelId, M method, long requestResponseId)
     {
         _channelId = channelId;
         _method = method;
+        _requestResponseId = requestResponseId;
     }
 
     public M getMethod()
@@ -55,11 +56,17 @@
         return _channelId;
     }
 
+    public long getRequestResponseId()
+    {
+        return _requestResponseId;
+    }
+
     public String toString()
     {
-        StringBuilder buf = new StringBuilder("Method event: ");
-        buf.append("\nChannel id: ").append(_channelId);
-        buf.append("\nMethod: ").append(_method);
+        StringBuilder buf = new StringBuilder("Method event: \n");
+        buf.append("Channel id: \n").append(_channelId);
+        buf.append("Method: \n").append(_method);
+        buf.append("Request/Response Id: ").append(_requestResponseId);
         return buf.toString();
     }
 }


Reply via email to