Author: kpvdr
Date: Mon Jan 15 13:24:15 2007
New Revision: 496499

URL: http://svn.apache.org/viewvc?view=rev&rev=496499
Log:
Changed the RequestManager to use AMQMethodListener instead of the old 
AMQResponseCallback.

Removed:
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java
Modified:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Mon Jan 15 13:24:15 2007
@@ -36,7 +36,6 @@
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
-import org.apache.qpid.framing.AMQResponseCallback;
 import org.apache.qpid.framing.HeartbeatBody;
 import org.apache.qpid.framing.RequestManager;
 import org.apache.qpid.framing.ResponseManager;
@@ -66,7 +65,6 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 
 public class AMQMinaProtocolSession implements AMQProtocolSession,
-                                               AMQResponseCallback,
                                                ProtocolVersionList,
                                                Managable
 {
@@ -189,7 +187,7 @@
                     null,      // serverProperties
                     (short)_major,     // versionMajor
                     (short)_minor);    // versionMinor
-                writeRequest(0, connectionStartBody, this);
+                writeRequest(0, connectionStartBody, _stateManager);
             }
             catch (AMQException e)
             {
@@ -226,11 +224,6 @@
         }
     }
     
-       public void responseFrameReceived(AMQResponseBody responseBody)
-    {
-        // do nothing
-    }
-    
     private void requestFrameReceived(int channelNum, AMQRequestBody 
requestBody) throws AMQException
     {
         if (_logger.isDebugEnabled())
@@ -253,12 +246,12 @@
         requestManager.responseReceived(responseBody);
     }
 
-    public long writeRequest(int channelNum, AMQMethodBody methodBody, 
AMQResponseCallback responseCallback)
+    public long writeRequest(int channelNum, AMQMethodBody methodBody, 
AMQMethodListener methodListener)
         throws RequestResponseMappingException
     {
         AMQChannel channel = getChannel(channelNum);
         RequestManager requestManager = channel.getRequestManager();
-        return requestManager.sendRequest(methodBody, responseCallback);
+        return requestManager.sendRequest(methodBody, methodListener);
     }
 
     public void writeResponse(int channelNum, long requestId, AMQMethodBody 
methodBody)

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 Mon Jan 15 13:24:15 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -49,7 +50,7 @@
  * the state for the connection.
  *
  */
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements 
ProtocolVersionList, AMQResponseCallback
+public class AMQPFastProtocolHandler extends IoHandlerAdapter implements 
ProtocolVersionList
 {
     private static final Logger _logger = 
Logger.getLogger(AMQPFastProtocolHandler.class);
 
@@ -153,7 +154,8 @@
 
     }
 
-    public void exceptionCaught(IoSession protocolSession, Throwable 
throwable) throws Exception
+    public void exceptionCaught(IoSession protocolSession, AMQMethodListener 
methodListener,
+        Throwable throwable) throws Exception
     {
         AMQProtocolSession session = 
AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
         if (throwable instanceof AMQProtocolHeaderException)
@@ -181,15 +183,10 @@
                 0,     // methodId
                 200,   // replyCode
                 throwable.getMessage());       // replyText
-            protocolSession.writeRequest(0, closeBody, this);
+            session.writeRequest(0, closeBody, methodListener);
             _logger.error("Exception caught in" + session + ", closing session 
explictly: " + throwable, throwable);
             protocolSession.close();
         }
-    }
-    
-       public void responseFrameReceived(AMQResponseBody responseBody)
-    {
-        // do nothing
     }
 
     /**

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
 Mon Jan 15 13:24:15 2007
@@ -22,6 +22,8 @@
 
 import java.util.Hashtable;
 
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.AMQProtocolWriter;
 
 public class RequestManager
@@ -41,7 +43,7 @@
      */
     private long lastProcessedResponseId;
 
-    private Hashtable<Long, AMQResponseCallback> requestSentMap;
+    private Hashtable<Long, AMQMethodListener> requestSentMap;
 
     public RequestManager(int channel, AMQProtocolWriter protocolWriter)
     {
@@ -49,34 +51,36 @@
         this.protocolWriter = protocolWriter;
         requestIdCount = 1L;
         lastProcessedResponseId = 0L;
-        requestSentMap = new Hashtable<Long, AMQResponseCallback>();
+        requestSentMap = new Hashtable<Long, AMQMethodListener>();
     }
 
     // *** Functions to originate a request ***
 
     public long sendRequest(AMQMethodBody requestMethodBody,
-        AMQResponseCallback responseCallback)
+        AMQMethodListener methodListener)
     {
         long requestId = getNextRequestId(); // Get new request ID
         AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, 
requestId,
             lastProcessedResponseId, requestMethodBody);
         protocolWriter.writeFrame(requestFrame);
-        requestSentMap.put(requestId, responseCallback);
+        requestSentMap.put(requestId, methodListener);
         return requestId;
     }
 
     public void responseReceived(AMQResponseBody responseBody)
-        throws RequestResponseMappingException
+        throws Exception
     {
         long requestIdStart = responseBody.getRequestId();
         long requestIdStop = requestIdStart + responseBody.getBatchOffset();
         for (long requestId = requestIdStart; requestId <= requestIdStop; 
requestId++)
         {
-            AMQResponseCallback responseCallback = 
requestSentMap.get(requestId);
-            if (responseCallback == null)
+            AMQMethodListener methodListener = requestSentMap.get(requestId);
+            if (methodListener == null)
                 throw new RequestResponseMappingException(requestId,
                     "Failed to locate requestId " + requestId + " in 
requestSentMap.");
-            responseCallback.responseFrameReceived(responseBody);
+            AMQMethodEvent methodEvent = new AMQMethodEvent(channel, 
responseBody.getMethodPayload(),
+                requestId);
+            methodListener.methodReceived(methodEvent);
             requestSentMap.remove(requestId);
         }
         lastProcessedResponseId = responseBody.getResponseId();

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
 Mon Jan 15 13:24:15 2007
@@ -37,13 +37,13 @@
 {
     private final M _method;
     private final int _channelId;
-    private final long _requestResponseId;
+    private final long _requestId;
 
-    public AMQMethodEvent(int channelId, M method, long requestResponseId)
+    public AMQMethodEvent(int channelId, M method, long requestId)
     {
         _channelId = channelId;
         _method = method;
-        _requestResponseId = requestResponseId;
+        _requestId = requestId;
     }
 
     public M getMethod()
@@ -56,9 +56,9 @@
         return _channelId;
     }
 
-    public long getRequestResponseId()
+    public long getRequestId()
     {
-        return _requestResponseId;
+        return _requestId;
     }
 
     public String toString()
@@ -66,7 +66,7 @@
         StringBuilder buf = new StringBuilder("Method event: \n");
         buf.append("Channel id: \n").append(_channelId);
         buf.append("Method: \n").append(_method);
-        buf.append("Request/Response Id: ").append(_requestResponseId);
+        buf.append("Request Id: ").append(_requestId);
         return buf.toString();
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
 Mon Jan 15 13:24:15 2007
@@ -21,6 +21,9 @@
 package org.apache.qpid.protocol;
 
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.protocol.AMQMethodListener;
 
 public interface AMQProtocolWriter
 {
@@ -29,4 +32,10 @@
      * @param frame the frame to be encoded and written
      */
        public void writeFrame(AMQDataBlock frame);
+    
+    public long writeRequest(int channelNum, AMQMethodBody methodBody,
+        AMQMethodListener methodListener) throws 
RequestResponseMappingException;
+
+    public void writeResponse(int channelNum, long requestId, AMQMethodBody 
methodBody)
+        throws RequestResponseMappingException;
 }


Reply via email to