Author: rajith
Date: Wed Jan 17 08:54:19 2007
New Revision: 497078

URL: http://svn.apache.org/viewvc?view=rev&rev=497078
Log:
added error handling and resolved compilation errors

Modified:
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=497078&r1=497077&r2=497078
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Jan 17 08:54:19 2007
@@ -223,11 +223,11 @@
         {
             if (message.content != null)
             {
-                final BasicMessageConsumer consumer = (BasicMessageConsumer) 
_consumers.get(message.content.consumerTag);
+                final BasicMessageConsumer consumer = (BasicMessageConsumer) 
_consumers.get(message.contentHeader.getDestination());
 
                 if (consumer == null)
                 {
-                    _logger.warn("Received a message from queue " + 
message.content.consumerTag + " without a handler - ignoring...");
+                    _logger.warn("Received a message from queue " + 
message.contentHeader.getDestination() + " without a handler - ignoring...");
                     _logger.warn("Consumers that exist: " + _consumers);
                     _logger.warn("Session hashcode: " + 
System.identityHashCode(this));
                 }
@@ -245,8 +245,8 @@
                     // Bounced message is processed here, away from the mina 
thread
                     AbstractJMSMessage bouncedMessage = 
_messageFactoryRegistry.createMessage(0,
                                                                                
               false,
-                                                                               
               message.contentHeader,
-                                                                               
               message.bodies);
+                            e                                                  
                message.contentHeader,
+                                                                               
               message.content);
 
                     int errorCode = message.bounceBody.replyCode;
                     String reason = message.bounceBody.replyText;
@@ -316,12 +316,16 @@
             _queue = new 
FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
                                                       new 
FlowControllingBlockingQueue.ThresholdListener()
                                                       {
-                                                          public void 
aboveThreshold(int currentValue)
+                                                          public void 
aboveThreshold(int currentValue) 
                                                           {
                                                               if 
(_acknowledgeMode == NO_ACKNOWLEDGE)
                                                               {
                                                                   
_logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending 
channel. Current value is " + currentValue);
-                                                                  
suspendChannel();
+                                                                 try{
+                                                                       
suspendChannel();
+                                                                 }catch 
(AMQException e) {
+                                                                         
_logger.error("FlowControllingBlockingQueue,aboveThreshold, Cannot Suspend the 
channel",e);
+                                                                 }
                                                               }
                                                           }
 
@@ -330,7 +334,11 @@
                                                               if 
(_acknowledgeMode == NO_ACKNOWLEDGE)
                                                               {
                                                                   
_logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending 
channel. Current value is " + currentValue);
-                                                                  
unsuspendChannel();
+                                                                       try {
+                                                                               
                                                                
unsuspendChannel();
+                                                                               
                                                        } catch (AMQException 
e) {
+                                                                               
                                                                
_logger.error("FlowControllingBlockingQueue,underThreshold, Cannot Unsuspend 
the channel",e);
+                                                                               
                                                        }
                                                               }
                                                           }
                                                       });
@@ -767,9 +775,16 @@
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        _connection.getProtocolHandler().writeRequest(_channelId,
-            MessageRecoverBody.createMethodBody((byte)0, (byte)9,      // AMQP 
version (major, minor)
-                false));       // requeue
+        try {
+                       
_connection.getProtocolHandler().writeRequest(_channelId,
+                           MessageRecoverBody.createMethodBody((byte)0, 
(byte)9,       // AMQP version (major, minor)
+                               false));        // requeue
+               } catch (AMQException e) {
+                       _logger.error("Error recovering",e);
+                       JMSException ex = new JMSException("Error Recovering");
+                       ex.initCause(e);
+                       throw ex;
+               }
     }
 
     boolean isInRecovery()
@@ -1094,7 +1109,7 @@
     }
 
 
-    public void declareExchange(String name, String type)
+    public void declareExchange(String name, String type) throws AMQException
     {
         declareExchange(name, type, _connection.getProtocolHandler());
     }
@@ -1118,12 +1133,12 @@
         _connection.getProtocolHandler().syncWrite(_channelId, methodBody, 
ExchangeDeclareOkBody.class);
     }
 
-    private void declareExchange(AMQDestination amqd, AMQProtocolHandler 
protocolHandler)
+    private void declareExchange(AMQDestination amqd, AMQProtocolHandler 
protocolHandler)throws AMQException
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), 
protocolHandler);
     }
 
-    private void declareExchange(String name, String type, AMQProtocolHandler 
protocolHandler)
+    private void declareExchange(String name, String type, AMQProtocolHandler 
protocolHandler) throws AMQException
     {
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
@@ -1139,6 +1154,7 @@
             false,     // passive
             0, // ticket
             type);     // type
+        
         protocolHandler.writeRequest(_channelId, methodBody);
     }
 
@@ -1586,8 +1602,9 @@
      * @param deliveryTag the tag of the last message to be acknowledged
      * @param multiple    if true will acknowledge all messages up to and 
including the one specified by the
      *                    delivery tag
+     * @throws AMQException 
      */
-    public void acknowledgeMessage(long requestId, boolean multiple)
+    public void acknowledgeMessage(long requestId, boolean multiple) throws 
AMQException
     {
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
@@ -1626,8 +1643,12 @@
     {
         if (_dispatcher != null)
         {
-            //then we stopped this and are restarting, so signal server to 
resume delivery
-            unsuspendChannel();
+               try{
+               //then we stopped this and are restarting, so signal server to 
resume delivery
+                       unsuspendChannel();
+               }catch(AMQException e){
+                       _logger.error("Error Un Suspending Channel", e);
+               }
         }
         _dispatcher = new Dispatcher();
         _dispatcher.setDaemon(true);
@@ -1637,8 +1658,12 @@
     void stop()
     {
         //stop the server delivering messages to this session
-        suspendChannel();
-
+        try{
+               suspendChannel();
+        }catch(AMQException e){
+               _logger.error("Error Suspending Channel", e);
+        }
+        
 //stop the dispatcher thread
         _stopped.set(true);
     }
@@ -1750,7 +1775,7 @@
         }
     }
 
-    private void suspendChannel()
+    private void suspendChannel() throws AMQException
     {
         _logger.warn("Suspending channel");
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
@@ -1762,7 +1787,7 @@
         _connection.getProtocolHandler().writeRequest(_channelId, methodBody);
     }
 
-    private void unsuspendChannel()
+    private void unsuspendChannel() throws AMQException
     {
         _logger.warn("Unsuspending channel");
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)


Reply via email to