Author: kpvdr
Date: Mon Jan 15 11:59:11 2007
New Revision: 496456
URL: http://svn.apache.org/viewvc?view=rev&rev=496456
Log:
Added a request/response id to the MethodEvent class that is used to dispatch
incoming messages to the handlers. Corrected some compile errors.
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496456&r1=496455&r2=496456
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Mon Jan 15 11:59:11 2007
@@ -38,6 +38,8 @@
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.AMQResponseCallback;
import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.RequestManager;
+import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
@@ -64,6 +66,7 @@
import java.util.concurrent.CopyOnWriteArraySet;
public class AMQMinaProtocolSession implements AMQProtocolSession,
+ AMQResponseCallback,
ProtocolVersionList,
Managable
{
@@ -179,14 +182,14 @@
String mechanisms =
ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
// Interfacing with generated code - be aware of possible
changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short)
0,
+ AMQMethodBody connectionStartBody =
ConnectionStartBody.createMethodBody(
(byte)_major, (byte)_minor, // AMQP version (major, minor)
locales.getBytes(), // locales
mechanisms.getBytes(), // mechanisms
null, // serverProperties
(short)_major, // versionMajor
(short)_minor); // versionMinor
- _minaProtocolSession.write(response);
+ writeRequest(0, connectionStartBody, this);
}
catch (AMQException e)
{
@@ -223,42 +226,47 @@
}
}
- private void requestFrameReceived(int channel, AMQRequestBody requestBody)
throws AMQException
+ public void responseFrameReceived(AMQResponseBody responseBody)
+ {
+ // do nothing
+ }
+
+ private void requestFrameReceived(int channelNum, AMQRequestBody
requestBody) throws AMQException
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Request frame received: " + frame);
+ _logger.debug("Request frame received: " + requestBody);
}
- AMQChannel channel = getChannel(channel);
+ AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
responseManager.requestReceived(requestBody);
}
- private void responseFrameReceived(int channel, AMQResponseBody
responseBody) throws AMQException
+ private void responseFrameReceived(int channelNum, AMQResponseBody
responseBody) throws AMQException
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Response frame received: " + frame);
+ _logger.debug("Response frame received: " + responseBody);
}
- AMQChannel channel = getChannel(channel);
+ AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
requestManager.responseReceived(responseBody);
}
- public long writeRequest(int channel, AMQMethodBody methodBody,
AMQResponseCallback responseCallback)
+ public long writeRequest(int channelNum, AMQMethodBody methodBody,
AMQResponseCallback responseCallback)
throws RequestResponseMappingException
{
- AMQChannel channel = getChannel(channel);
+ AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
return requestManager.sendRequest(methodBody, responseCallback);
}
- public void writeResponse(int channel, long requestId, AMQMethodBody
methodBody)
+ public void writeResponse(int channelNum, long requestId, AMQMethodBody
methodBody)
throws RequestResponseMappingException
{
- AMQChannel channel = getChannel(channel);
+ AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
- responseManager(requestId, methodBody);
+ responseManager.sendResponse(requestId, methodBody);
}
/**
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=496456&r1=496455&r2=496456
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
Mon Jan 15 11:59:11 2007
@@ -49,7 +49,7 @@
* the state for the connection.
*
*/
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements
ProtocolVersionList
+public class AMQPFastProtocolHandler extends IoHandlerAdapter implements
ProtocolVersionList, AMQResponseCallback
{
private static final Logger _logger =
Logger.getLogger(AMQPFastProtocolHandler.class);
@@ -175,16 +175,21 @@
// AMQP version change: Hardwire the version to 0-9 (major=0,
minor=9)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions
change.
- protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+ AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
0, // classId
0, // methodId
200, // replyCode
- throwable.getMessage() // replyText
- ));
+ throwable.getMessage()); // replyText
+ protocolSession.writeRequest(0, closeBody, this);
_logger.error("Exception caught in" + session + ", closing session
explictly: " + throwable, throwable);
protocolSession.close();
}
+ }
+
+ public void responseFrameReceived(AMQResponseBody responseBody)
+ {
+ // do nothing
}
/**
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=496456&r1=496455&r2=496456
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
Mon Jan 15 11:59:11 2007
@@ -111,7 +111,7 @@
lastReceivedRequestId = requestId;
responseMap.put(requestId, new ResponseStatus(requestId));
// TODO: Update MethodEvent to use the RequestBody instead of
MethodBody
- AMQMethodEvent methodEvent = new AMQMethodEvent(channel,
requestBody.getMethodPayload());
+ AMQMethodEvent methodEvent = new AMQMethodEvent(channel,
requestBody.getMethodPayload(), requestId);
methodListener.methodReceived(methodEvent);
}
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java?view=diff&rev=496456&r1=496455&r2=496456
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
Mon Jan 15 11:59:11 2007
@@ -36,13 +36,14 @@
public class AMQMethodEvent<M extends AMQMethodBody>
{
private final M _method;
-
private final int _channelId;
+ private final long _requestResponseId;
- public AMQMethodEvent(int channelId, M method)
+ public AMQMethodEvent(int channelId, M method, long requestResponseId)
{
_channelId = channelId;
_method = method;
+ _requestResponseId = requestResponseId;
}
public M getMethod()
@@ -55,11 +56,17 @@
return _channelId;
}
+ public long getRequestResponseId()
+ {
+ return _requestResponseId;
+ }
+
public String toString()
{
- StringBuilder buf = new StringBuilder("Method event: ");
- buf.append("\nChannel id: ").append(_channelId);
- buf.append("\nMethod: ").append(_method);
+ StringBuilder buf = new StringBuilder("Method event: \n");
+ buf.append("Channel id: \n").append(_channelId);
+ buf.append("Method: \n").append(_method);
+ buf.append("Request/Response Id: ").append(_requestResponseId);
return buf.toString();
}
}