Author: kpvdr
Date: Mon Jan 15 13:24:15 2007
New Revision: 496499
URL: http://svn.apache.org/viewvc?view=rev&rev=496499
Log:
Changed the RequestManager to use AMQMethodListener instead of the old
AMQResponseCallback.
Removed:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Mon Jan 15 13:24:15 2007
@@ -36,7 +36,6 @@
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
-import org.apache.qpid.framing.AMQResponseCallback;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.RequestManager;
import org.apache.qpid.framing.ResponseManager;
@@ -66,7 +65,6 @@
import java.util.concurrent.CopyOnWriteArraySet;
public class AMQMinaProtocolSession implements AMQProtocolSession,
- AMQResponseCallback,
ProtocolVersionList,
Managable
{
@@ -189,7 +187,7 @@
null, // serverProperties
(short)_major, // versionMajor
(short)_minor); // versionMinor
- writeRequest(0, connectionStartBody, this);
+ writeRequest(0, connectionStartBody, _stateManager);
}
catch (AMQException e)
{
@@ -226,11 +224,6 @@
}
}
- public void responseFrameReceived(AMQResponseBody responseBody)
- {
- // do nothing
- }
-
private void requestFrameReceived(int channelNum, AMQRequestBody
requestBody) throws AMQException
{
if (_logger.isDebugEnabled())
@@ -253,12 +246,12 @@
requestManager.responseReceived(responseBody);
}
- public long writeRequest(int channelNum, AMQMethodBody methodBody,
AMQResponseCallback responseCallback)
+ public long writeRequest(int channelNum, AMQMethodBody methodBody,
AMQMethodListener methodListener)
throws RequestResponseMappingException
{
AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
- return requestManager.sendRequest(methodBody, responseCallback);
+ return requestManager.sendRequest(methodBody, methodListener);
}
public void writeResponse(int channelNum, long requestId, AMQMethodBody
methodBody)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
Mon Jan 15 13:24:15 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -49,7 +50,7 @@
* the state for the connection.
*
*/
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements
ProtocolVersionList, AMQResponseCallback
+public class AMQPFastProtocolHandler extends IoHandlerAdapter implements
ProtocolVersionList
{
private static final Logger _logger =
Logger.getLogger(AMQPFastProtocolHandler.class);
@@ -153,7 +154,8 @@
}
- public void exceptionCaught(IoSession protocolSession, Throwable
throwable) throws Exception
+ public void exceptionCaught(IoSession protocolSession, AMQMethodListener
methodListener,
+ Throwable throwable) throws Exception
{
AMQProtocolSession session =
AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
if (throwable instanceof AMQProtocolHeaderException)
@@ -181,15 +183,10 @@
0, // methodId
200, // replyCode
throwable.getMessage()); // replyText
- protocolSession.writeRequest(0, closeBody, this);
+ session.writeRequest(0, closeBody, methodListener);
_logger.error("Exception caught in" + session + ", closing session
explictly: " + throwable, throwable);
protocolSession.close();
}
- }
-
- public void responseFrameReceived(AMQResponseBody responseBody)
- {
- // do nothing
}
/**
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
Mon Jan 15 13:24:15 2007
@@ -22,6 +22,8 @@
import java.util.Hashtable;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
public class RequestManager
@@ -41,7 +43,7 @@
*/
private long lastProcessedResponseId;
- private Hashtable<Long, AMQResponseCallback> requestSentMap;
+ private Hashtable<Long, AMQMethodListener> requestSentMap;
public RequestManager(int channel, AMQProtocolWriter protocolWriter)
{
@@ -49,34 +51,36 @@
this.protocolWriter = protocolWriter;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
- requestSentMap = new Hashtable<Long, AMQResponseCallback>();
+ requestSentMap = new Hashtable<Long, AMQMethodListener>();
}
// *** Functions to originate a request ***
public long sendRequest(AMQMethodBody requestMethodBody,
- AMQResponseCallback responseCallback)
+ AMQMethodListener methodListener)
{
long requestId = getNextRequestId(); // Get new request ID
AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel,
requestId,
lastProcessedResponseId, requestMethodBody);
protocolWriter.writeFrame(requestFrame);
- requestSentMap.put(requestId, responseCallback);
+ requestSentMap.put(requestId, methodListener);
return requestId;
}
public void responseReceived(AMQResponseBody responseBody)
- throws RequestResponseMappingException
+ throws Exception
{
long requestIdStart = responseBody.getRequestId();
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
for (long requestId = requestIdStart; requestId <= requestIdStop;
requestId++)
{
- AMQResponseCallback responseCallback =
requestSentMap.get(requestId);
- if (responseCallback == null)
+ AMQMethodListener methodListener = requestSentMap.get(requestId);
+ if (methodListener == null)
throw new RequestResponseMappingException(requestId,
"Failed to locate requestId " + requestId + " in
requestSentMap.");
- responseCallback.responseFrameReceived(responseBody);
+ AMQMethodEvent methodEvent = new AMQMethodEvent(channel,
responseBody.getMethodPayload(),
+ requestId);
+ methodListener.methodReceived(methodEvent);
requestSentMap.remove(requestId);
}
lastProcessedResponseId = responseBody.getResponseId();
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
Mon Jan 15 13:24:15 2007
@@ -37,13 +37,13 @@
{
private final M _method;
private final int _channelId;
- private final long _requestResponseId;
+ private final long _requestId;
- public AMQMethodEvent(int channelId, M method, long requestResponseId)
+ public AMQMethodEvent(int channelId, M method, long requestId)
{
_channelId = channelId;
_method = method;
- _requestResponseId = requestResponseId;
+ _requestId = requestId;
}
public M getMethod()
@@ -56,9 +56,9 @@
return _channelId;
}
- public long getRequestResponseId()
+ public long getRequestId()
{
- return _requestResponseId;
+ return _requestId;
}
public String toString()
@@ -66,7 +66,7 @@
StringBuilder buf = new StringBuilder("Method event: \n");
buf.append("Channel id: \n").append(_channelId);
buf.append("Method: \n").append(_method);
- buf.append("Request/Response Id: ").append(_requestResponseId);
+ buf.append("Request Id: ").append(_requestId);
return buf.toString();
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java?view=diff&rev=496499&r1=496498&r2=496499
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
Mon Jan 15 13:24:15 2007
@@ -21,6 +21,9 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.protocol.AMQMethodListener;
public interface AMQProtocolWriter
{
@@ -29,4 +32,10 @@
* @param frame the frame to be encoded and written
*/
public void writeFrame(AMQDataBlock frame);
+
+ public long writeRequest(int channelNum, AMQMethodBody methodBody,
+ AMQMethodListener methodListener) throws
RequestResponseMappingException;
+
+ public void writeResponse(int channelNum, long requestId, AMQMethodBody
methodBody)
+ throws RequestResponseMappingException;
}