Author: gsim
Date: Thu Jan 25 10:24:48 2007
New Revision: 499880
URL: http://svn.apache.org/viewvc?view=rev&rev=499880
Log:
Improved channel/connection exception handling.
Added:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
(with props)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=499880&r1=499879&r2=499880
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Thu Jan 25 10:24:48 2007
@@ -25,6 +25,7 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQProtocolVersionException;
@@ -246,7 +247,7 @@
throw new AMQException("Incoming request frame on
connection which is pending close.");
AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
if (!(requestBody.getMethodPayload() instanceof
ConnectionCloseOkBody))
- throw new AMQException("Incoming frame on unopened channel
is not a Connection.Open method.");
+ throw new AMQException("Incoming frame on closing
connection is not a Connection.CloseOk method.");
}
else if (channel == null)
{
@@ -259,8 +260,13 @@
if(!(frame.bodyFrame instanceof AMQRequestBody))
throw new AMQException("Incoming frame on unopened channel
is not a request.");
AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
- if (!(requestBody.getMethodPayload() instanceof
ChannelOpenBody))
- throw new AMQException("Incoming frame on unopened channel
is not a Channel.Open method.");
+ if (!(requestBody.getMethodPayload() instanceof
ChannelOpenBody)) {
+ closeSessionRequest(
+ requestBody.getMethodPayload().getConnectionException(
+ 504, "Incoming frame on unopened channel is not a
Connection.Open method."
+ )
+ );
+ }
if (requestBody.getRequestId() != 1)
throw new AMQException("Incoming Channel.Open frame on
unopened channel does not have a request id = 1.");
channel = createChannel(frame.channel);
@@ -283,13 +289,29 @@
private void requestFrameReceived(int channelNum, AMQRequestBody
requestBody) throws Exception
{
- if (_logger.isDebugEnabled())
+ try{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request frame received: " + requestBody);
+ }
+ AMQChannel channel = getChannel(channelNum);
+ ResponseManager responseManager = channel.getResponseManager();
+ responseManager.requestReceived(requestBody);
+ }
+ catch (AMQChannelException e)
+ {
+ _logger.error("Closing channel due to: " + e.getMessage());
+ writeRequest(channelNum, e.getCloseMethodBody());
+ AMQChannel channel = _channelMap.remove(channelNum);
+ if (channel != null) {
+ channel.close(this);
+ }
+ }
+ catch (AMQConnectionException e)
{
- _logger.debug("Request frame received: " + requestBody);
+ _logger.error("Closing connection due to: " + e.getMessage());
+ closeSessionRequest(e);
}
- AMQChannel channel = getChannel(channelNum);
- ResponseManager responseManager = channel.getResponseManager();
- responseManager.requestReceived(requestBody);
}
private void responseFrameReceived(int channelNum, AMQResponseBody
responseBody) throws Exception
@@ -490,6 +512,13 @@
{
closeSessionRequest(replyCode, replyText, 0, 0);
}
+
+
+ public void closeSessionRequest(AMQConnectionException e) throws
AMQException
+ {
+ closeSessionRequest(e.getErrorCode(), e.getMessage(), e.getClassId(),
e.getMethodId());
+ }
+
// Used to close a connection as a response to a client close request
public void closeSessionResponse(long requestId) throws AMQException
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=499880&r1=499879&r2=499880
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Thu Jan 25 10:24:48 2007
@@ -52,6 +52,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQUnresolvedAddressException;
@@ -288,7 +289,7 @@
message = "Unable to Connect";
}
- AMQException e = new AMQConnectionException(message);
+ AMQException e = new AMQConnectionFailureException(message);
if (lastException != null)
{
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java?view=diff&rev=499880&r1=499879&r2=499880
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
Thu Jan 25 10:24:48 2007
@@ -21,10 +21,46 @@
package org.apache.qpid;
+import org.apache.qpid.framing.ConnectionCloseBody;
+
public class AMQConnectionException extends AMQException
{
- public AMQConnectionException(String message)
+ private final int _classId;
+ private final int _methodId;
+ /* AMQP version for which exception ocurred */
+ private final byte major;
+ private final byte minor;
+
+ public AMQConnectionException(int errorCode, String msg, int classId, int
methodId, byte major, byte minor, Throwable t)
{
- super(message);
+ super(errorCode, msg, t);
+ _classId = classId;
+ _methodId = methodId;
+ this.major = major;
+ this.minor = minor;
}
+
+ public AMQConnectionException(int errorCode, String msg, int classId, int
methodId, byte major, byte minor)
+ {
+ super(errorCode, msg);
+ _classId = classId;
+ _methodId = methodId;
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public ConnectionCloseBody getCloseMethodBody()
+ {
+ return ConnectionCloseBody.createMethodBody(major, minor, _classId,
_methodId, getErrorCode(), getMessage());
+ }
+
+ public int getClassId()
+ {
+ return _classId;
+ }
+
+ public int getMethodId(){
+ return _methodId;
+ }
+
}
Added:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java?view=auto&rev=499880
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
(added)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
Thu Jan 25 10:24:48 2007
@@ -0,0 +1,9 @@
+package org.apache.qpid;
+
+public class AMQConnectionFailureException extends AMQException
+{
+ public AMQConnectionFailureException(String message)
+ {
+ super(message);
+ }
+}
Propchange:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=499880&r1=499879&r2=499880
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
Thu Jan 25 10:24:48 2007
@@ -22,6 +22,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
public abstract class AMQMethodBody extends AMQBody
{
@@ -102,5 +103,17 @@
public AMQChannelException getChannelException(int code, String message,
Throwable cause)
{
return new AMQChannelException(code, message, getClazz(), getMethod(),
major, minor, cause);
+ }
+
+ public AMQConnectionException getConnectionException(int code, String
message)
+ {
+ return new AMQConnectionException(code, message, getClazz(),
getMethod(), major, minor);
+ }
+
+
+
+ public AMQConnectionException getConnectionException(int code, String
message, Throwable cause)
+ {
+ return new AMQConnectionException(code, message, getClazz(),
getMethod(), major, minor, cause);
}
}