Author: rajith
Date: Wed Jan 17 08:54:19 2007
New Revision: 497078
URL: http://svn.apache.org/viewvc?view=rev&rev=497078
Log:
added error handling and resolved compilation errors
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=497078&r1=497077&r2=497078
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Jan 17 08:54:19 2007
@@ -223,11 +223,11 @@
{
if (message.content != null)
{
- final BasicMessageConsumer consumer = (BasicMessageConsumer)
_consumers.get(message.content.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer)
_consumers.get(message.contentHeader.getDestination());
if (consumer == null)
{
- _logger.warn("Received a message from queue " +
message.content.consumerTag + " without a handler - ignoring...");
+ _logger.warn("Received a message from queue " +
message.contentHeader.getDestination() + " without a handler - ignoring...");
_logger.warn("Consumers that exist: " + _consumers);
_logger.warn("Session hashcode: " +
System.identityHashCode(this));
}
@@ -245,8 +245,8 @@
// Bounced message is processed here, away from the mina
thread
AbstractJMSMessage bouncedMessage =
_messageFactoryRegistry.createMessage(0,
false,
-
message.contentHeader,
-
message.bodies);
+ e
message.contentHeader,
+
message.content);
int errorCode = message.bounceBody.replyCode;
String reason = message.bounceBody.replyText;
@@ -316,12 +316,16 @@
_queue = new
FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
new
FlowControllingBlockingQueue.ThresholdListener()
{
- public void
aboveThreshold(int currentValue)
+ public void
aboveThreshold(int currentValue)
{
if
(_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending
channel. Current value is " + currentValue);
-
suspendChannel();
+ try{
+
suspendChannel();
+ }catch
(AMQException e) {
+
_logger.error("FlowControllingBlockingQueue,aboveThreshold, Cannot Suspend the
channel",e);
+ }
}
}
@@ -330,7 +334,11 @@
if
(_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending
channel. Current value is " + currentValue);
-
unsuspendChannel();
+ try {
+
unsuspendChannel();
+
} catch (AMQException
e) {
+
_logger.error("FlowControllingBlockingQueue,underThreshold, Cannot Unsuspend
the channel",e);
+
}
}
}
});
@@ -767,9 +775,16 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().writeRequest(_channelId,
- MessageRecoverBody.createMethodBody((byte)0, (byte)9, // AMQP
version (major, minor)
- false)); // requeue
+ try {
+
_connection.getProtocolHandler().writeRequest(_channelId,
+ MessageRecoverBody.createMethodBody((byte)0,
(byte)9, // AMQP version (major, minor)
+ false)); // requeue
+ } catch (AMQException e) {
+ _logger.error("Error recovering",e);
+ JMSException ex = new JMSException("Error Recovering");
+ ex.initCause(e);
+ throw ex;
+ }
}
boolean isInRecovery()
@@ -1094,7 +1109,7 @@
}
- public void declareExchange(String name, String type)
+ public void declareExchange(String name, String type) throws AMQException
{
declareExchange(name, type, _connection.getProtocolHandler());
}
@@ -1118,12 +1133,12 @@
_connection.getProtocolHandler().syncWrite(_channelId, methodBody,
ExchangeDeclareOkBody.class);
}
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler
protocolHandler)
+ private void declareExchange(AMQDestination amqd, AMQProtocolHandler
protocolHandler)throws AMQException
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(),
protocolHandler);
}
- private void declareExchange(String name, String type, AMQProtocolHandler
protocolHandler)
+ private void declareExchange(String name, String type, AMQProtocolHandler
protocolHandler) throws AMQException
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
@@ -1139,6 +1154,7 @@
false, // passive
0, // ticket
type); // type
+
protocolHandler.writeRequest(_channelId, methodBody);
}
@@ -1586,8 +1602,9 @@
* @param deliveryTag the tag of the last message to be acknowledged
* @param multiple if true will acknowledge all messages up to and
including the one specified by the
* delivery tag
+ * @throws AMQException
*/
- public void acknowledgeMessage(long requestId, boolean multiple)
+ public void acknowledgeMessage(long requestId, boolean multiple) throws
AMQException
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
@@ -1626,8 +1643,12 @@
{
if (_dispatcher != null)
{
- //then we stopped this and are restarting, so signal server to
resume delivery
- unsuspendChannel();
+ try{
+ //then we stopped this and are restarting, so signal server to
resume delivery
+ unsuspendChannel();
+ }catch(AMQException e){
+ _logger.error("Error Un Suspending Channel", e);
+ }
}
_dispatcher = new Dispatcher();
_dispatcher.setDaemon(true);
@@ -1637,8 +1658,12 @@
void stop()
{
//stop the server delivering messages to this session
- suspendChannel();
-
+ try{
+ suspendChannel();
+ }catch(AMQException e){
+ _logger.error("Error Suspending Channel", e);
+ }
+
//stop the dispatcher thread
_stopped.set(true);
}
@@ -1750,7 +1775,7 @@
}
}
- private void suspendChannel()
+ private void suspendChannel() throws AMQException
{
_logger.warn("Suspending channel");
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
@@ -1762,7 +1787,7 @@
_connection.getProtocolHandler().writeRequest(_channelId, methodBody);
}
- private void unsuspendChannel()
+ private void unsuspendChannel() throws AMQException
{
_logger.warn("Unsuspending channel");
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)