Author: ritchiem
Date: Sun Feb 24 20:56:42 2008
New Revision: 630733
URL: http://svn.apache.org/viewvc?rev=630733&view=rev
Log:
QPID-810 : Moved check for closingChannels higher in stack and close channel on
any AMQException being thrown from the body.handle methods.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=630733&r1=630732&r2=630733&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Sun Feb 24 20:56:42 2008
@@ -109,7 +109,7 @@
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private List<Integer> _closingChannelsList = new ArrayList<Integer>();
+ private List<Integer> _closingChannelsList = new
CopyOnWriteArrayList<Integer>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
@@ -208,9 +208,39 @@
{
_logger.debug("Frame Received: " + frame);
}
+
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
+ {
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure
- processing close-ok");
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure
ignoring");
+ }
+
+ return;
+ }
+ }
- body.handle(channelId, this);
+
+ try
+ {
+ body.handle(channelId, this);
+ }
+ catch (AMQException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
}
@@ -259,27 +289,6 @@
final AMQMethodEvent<AMQMethodBody> evt = new
AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((evt.getMethod() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure
- processing close-ok");
- }
- }
- else
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure
ignoring");
- }
-
- return;
- }
- }
-
try
{
try
@@ -341,6 +350,7 @@
_logger.info("Closing connection due to: " +
e.getMessage());
}
+ markChannelawaitingCloseOk(channelId);
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));