Author: ritchiem
Date: Mon Feb 19 05:48:17 2007
New Revision: 509202

URL: http://svn.apache.org/viewvc?view=rev&rev=509202
Log:
QPID-379  Bounced Messages do not appear in connection exception listener. 

The previous commit that started the Dispatcher was wrong and caused a lot of 
failures. This will address that problem by providing a thread pool on the 
client connection object to deliver bounced messages to the exception handler. 

Tidied up MessageListenerTests so all the asserts are in the given test.

Renamed TestChannelCloseMethodHandlerNoCloseOk as surefire picks it up as a 
test case.


Added:
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Mon Feb 19 05:48:17 2007
@@ -62,6 +62,9 @@
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class AMQConnection extends Closeable implements Connection, 
QueueConnection, TopicConnection, Referenceable
 {
@@ -144,6 +147,9 @@
     private AMQShortString _temporaryTopicExchangeName = 
ExchangeDefaults.TOPIC_EXCHANGE_NAME;
     private AMQShortString _temporaryQueueExchangeName = 
ExchangeDefaults.DIRECT_EXCHANGE_NAME;
 
+    /** Thread Pool for executing connection level processes. Such as 
returning bounced messages. */
+    private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -716,8 +722,31 @@
             {
                 try
                 {
+                    long startCloseTime = System.currentTimeMillis();
+
+                    _taskPool.shutdown();
                     closeAllSessions(null, timeout);
+
+                    if (!_taskPool.isTerminated())
+                    {
+                        try
+                        {
+                            //adjust timeout
+                            long taskPoolTimeout = adjustTimeout(timeout, 
startCloseTime);
+
+                            _taskPool.awaitTermination(taskPoolTimeout , 
TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            _logger.info("Interrupted while shutting down 
connection thread pool.");
+                        }
+                    }
+
+                    //adjust timeout
+                    timeout = adjustTimeout(timeout, startCloseTime);
+
                     _protocolHandler.closeConnection(timeout);
+
                 }
                 catch (AMQException e)
                 {
@@ -727,6 +756,17 @@
         }
     }
 
+    private long adjustTimeout(long timeout, long startTime)
+    {
+        long now = System.currentTimeMillis();
+        timeout -= now - startTime;
+        if (timeout < 0)
+        {
+            timeout = 0;
+        }
+        return timeout;
+    }
+
     /**
      * Marks all sessions and their children as closed without sending any 
protocol messages. Useful when you need to
      * mark objects "visible" in userland as closed after failover or other 
significant event that impacts the
@@ -1146,5 +1186,10 @@
     public void setTemporaryQueueExchangeName(AMQShortString 
temporaryQueueExchangeName)
     {
         _temporaryQueueExchangeName = temporaryQueueExchangeName;
+    }
+
+    public void performConnectionTask(Runnable task)
+    {
+        _taskPool.execute(task);
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Feb 19 05:48:17 2007
@@ -72,7 +72,6 @@
 import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
@@ -192,7 +191,6 @@
 
     private boolean _hasMessageListeners;
 
-
     /** Responsible for decoding a message fragment and passing it to the 
appropriate message consumer. */
 
     private class Dispatcher extends Thread
@@ -277,42 +275,6 @@
 
                 }
             }
-            else
-            {
-                try
-                {
-                    // Bounced message is processed here, away from the mina 
thread
-                    AbstractJMSMessage bouncedMessage = 
_messageFactoryRegistry.createMessage(0,
-                                                                               
               false,
-                                                                               
               message.getBounceBody().exchange,
-                                                                               
               message.getBounceBody().routingKey,
-                                                                               
               message.getContentHeader(),
-                                                                               
               message.getBodies());
-
-                    AMQConstant errorCode = 
AMQConstant.getConstant(message.getBounceBody().replyCode);
-                    AMQShortString reason = message.getBounceBody().replyText;
-                    _logger.debug("Message returned with error code " + 
errorCode + " (" + reason + ")");
-
-                    //@TODO should this be moved to an exception handler of 
sorts. Somewhere errors are converted to correct execeptions.
-                    if (errorCode == AMQConstant.NO_CONSUMERS)
-                    {
-                        _connection.exceptionReceived(new 
AMQNoConsumersException("Error: " + reason, bouncedMessage));
-                    }
-                    else if (errorCode == AMQConstant.NO_ROUTE)
-                    {
-                        _connection.exceptionReceived(new 
AMQNoRouteException("Error: " + reason, bouncedMessage));
-                    }
-                    else
-                    {
-                        _connection.exceptionReceived(new 
AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
-                    }
-
-                }
-                catch (Exception e)
-                {
-                    _logger.error("Caught exception trying to raise 
undelivered message exception (dump follows) - ignoring...", e);
-                }
-            }
         }
 
         public void close()
@@ -1384,7 +1346,7 @@
 
         if (topicName.indexOf('/') == -1)
         {
-            return new AMQTopic(getDefaultTopicExchangeName(),new 
AMQShortString(topicName));
+            return new AMQTopic(getDefaultTopicExchangeName(), new 
AMQShortString(topicName));
         }
         else
         {
@@ -1474,8 +1436,8 @@
             }
             // if the queue is bound to the exchange but NOT for this topic, 
then the JMS spec
             // says we must trash the subscription.
-            if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
-                !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), 
topicName))
+            if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
+                !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), 
topicName))
             {
                 deleteQueue(dest.getAMQQueueName());
             }
@@ -1634,9 +1596,59 @@
                           + "] received in session with channel id " + 
_channelId);
         }
 
-        startDistpatcherIfNecessary();
+        if (message.getDeliverBody() == null)
+        {
+            // Return of the bounced message.
+            returnBouncedMessage(message);
+        }
+        else
+        {
+            _queue.add(message);
+        }
+    }
+
+    private void returnBouncedMessage(final UnprocessedMessage message)
+    {
+        _connection.performConnectionTask(
+                new Runnable()
+                {
+                    public void run()
+                    {
+                        try
+                        {
+                            // Bounced message is processed here, away from 
the mina thread
+                            AbstractJMSMessage bouncedMessage = 
_messageFactoryRegistry.createMessage(0,
+                                                                               
                       false,
+                                                                               
                       message.getBounceBody().exchange,
+                                                                               
                       message.getBounceBody().routingKey,
+                                                                               
                       message.getContentHeader(),
+                                                                               
                       message.getBodies());
+
+                            AMQConstant errorCode = 
AMQConstant.getConstant(message.getBounceBody().replyCode);
+                            AMQShortString reason = 
message.getBounceBody().replyText;
+                            _logger.debug("Message returned with error code " 
+ errorCode + " (" + reason + ")");
+
+                            //@TODO should this be moved to an exception 
handler of sorts. Somewhere errors are converted to correct execeptions.
+                            if (errorCode == AMQConstant.NO_CONSUMERS)
+                            {
+                                _connection.exceptionReceived(new 
AMQNoConsumersException("Error: " + reason, bouncedMessage));
+                            }
+                            else if (errorCode == AMQConstant.NO_ROUTE)
+                            {
+                                _connection.exceptionReceived(new 
AMQNoRouteException("Error: " + reason, bouncedMessage));
+                            }
+                            else
+                            {
+                                _connection.exceptionReceived(new 
AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+                            }
 
-        _queue.add(message);
+                        }
+                        catch (Exception e)
+                        {
+                            _logger.error("Caught exception trying to raise 
undelivered message exception (dump follows) - ignoring...", e);
+                        }
+                    }
+                });
     }
 
     /**
@@ -1882,7 +1894,7 @@
         {
             throw new javax.jms.InvalidDestinationException("Cannot create a 
subscription on a temporary topic created in another session");
         }
-        if(!(topic instanceof AMQTopic))
+        if (!(topic instanceof AMQTopic))
         {
             throw new javax.jms.InvalidDestinationException("Cannot create a 
subscription on topic created for another JMS Provider, class of topic provided 
is: " + topic.getClass().getName());
         }
@@ -1915,7 +1927,6 @@
     {
         return _connection.getTemporaryQueueExchangeName();
     }
-
 
 
     public int getTicket()

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
 Mon Feb 19 05:48:17 2007
@@ -42,16 +42,13 @@
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 
 /**
- * QPID-293 Setting MessageListener after connection has started can cause 
messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread 
take()s
- * from here and dispatches to the _consumers. If the _consumer1 doesn't have 
a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to 
pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the 
dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just 
poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent 
messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause 
messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in 
AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 
doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs 
to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will 
block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the 
first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the 
session will arrive first.
  */
 public class MessageListenerMultiConsumerTest extends TestCase
 {
@@ -66,7 +63,6 @@
     private MessageConsumer _consumer1;
     private MessageConsumer _consumer2;
 
-    private boolean _testAsync;
     private final CountDownLatch _allMessagesSent = new CountDownLatch(2); 
//all messages Sent Lock
 
     protected void setUp() throws Exception
@@ -116,16 +112,10 @@
 
         producerConnection.close();
 
-        _testAsync = false;
     }
 
     protected void tearDown() throws Exception
     {
-        //Should have recieved all async messages
-        if (_testAsync)
-        {
-            assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
-        }
         _clientConnection.close();
 
         super.tearDown();
@@ -161,8 +151,6 @@
 
     public void testAsynchronousRecieve() throws Exception
     {
-        _testAsync = true;
-
         _consumer1.setMessageListener(new MessageListener()
         {
             public void onMessage(Message message)
@@ -173,7 +161,7 @@
 
                 if (receivedCount1 == MSG_COUNT / 2)
                 {
-                    _allMessagesSent.countDown();
+                    _allMessagesSent.countDown();                    
                 }
 
             }
@@ -198,13 +186,14 @@
 
         try
         {
-            _allMessagesSent.await(2000, TimeUnit.MILLISECONDS);
+            _allMessagesSent.await(4000, TimeUnit.MILLISECONDS);
         }
         catch (InterruptedException e)
         {
             //do nothing
         }
 
+        assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
     }
 
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
 Mon Feb 19 05:48:17 2007
@@ -21,6 +21,8 @@
 package org.apache.qpid.client;
 
 import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -38,18 +40,17 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
 
 /**
- * QPID-293 Setting MessageListener after connection has started can cause 
messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread 
take()s
- * from here and dispatches to the _consumers. If the _consumer doesn't have a 
message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to 
pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the 
dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just 
poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent 
messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause 
messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in 
AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer 
doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs 
to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will 
block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the 
first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the 
session will arrive first.
  */
 public class MessageListenerTest extends TestCase implements MessageListener
 {
@@ -61,7 +62,7 @@
     private int receivedCount = 0;
     private MessageConsumer _consumer;
     private Connection _clientConnection;
-    private boolean _testAsync;
+    private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
 
     protected void setUp() throws Exception
     {
@@ -71,9 +72,9 @@
         InitialContextFactory factory = new 
PropertiesFileInitialContextFactory();
 
         Hashtable<String, String> env = new Hashtable<String, String>();
-        
+
         env.put("connectionfactory.connection", "amqp://client:[EMAIL 
PROTECTED]/test?brokerlist='vm://:1'");
-        env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+        env.put("queue.queue", "MessageListenerTest");
 
         _context = factory.getInitialContext(env);
 
@@ -86,7 +87,6 @@
 
         Session clientSession = _clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-
         _consumer = clientSession.createConsumer(queue);
 
         //Create Producer
@@ -106,16 +106,10 @@
 
         producerConnection.close();
 
-        _testAsync = false;
     }
 
     protected void tearDown() throws Exception
     {
-        //Should have recieved all async messages
-        if (_testAsync)
-        {
-            assertEquals(MSG_COUNT, receivedCount);
-        }
         _clientConnection.close();
 
         super.tearDown();
@@ -125,7 +119,6 @@
 
     public void testSynchronousRecieve() throws Exception
     {
-
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
             assertTrue(_consumer.receive(2000) != null);
@@ -134,21 +127,20 @@
 
     public void testAsynchronousRecieve() throws Exception
     {
-        _testAsync = true;
-
         _consumer.setMessageListener(this);
 
-
         _logger.info("Waiting 3 seconds for messages");
 
         try
         {
-            Thread.sleep(2000);
+            _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
         }
         catch (InterruptedException e)
         {
             //do nothing
         }
+        //Should have recieved all async messages
+        assertEquals(MSG_COUNT, receivedCount);
 
     }
 
@@ -157,6 +149,7 @@
         _logger.info("Received Message(" + receivedCount + "):" + message);
 
         receivedCount++;
+        _awaitMessages.countDown();
     }
 
     public static junit.framework.Test suite()

Added: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?view=auto&rev=509202
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
 Mon Feb 19 05:48:17 2007
@@ -0,0 +1,96 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
+
+public class ChannelCloseMethodHandlerNoCloseOk implements 
StateAwareMethodListener
+{
+    private static final Logger _logger = 
Logger.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
+
+    private static ChannelCloseMethodHandlerNoCloseOk _handler = new 
ChannelCloseMethodHandlerNoCloseOk();
+
+    public static ChannelCloseMethodHandlerNoCloseOk getInstance()
+    {
+        return _handler;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, 
AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    {
+        _logger.debug("ChannelClose method received");
+        ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+        AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
+        AMQShortString reason = method.replyText;
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Channel close reply code: " + errorCode + ", 
reason: " + reason);
+        }
+
+        // For this test Method Handler .. don't send Close-OK
+//        // TODO: Be aware of possible changes to parameter order as versions 
change.
+//        AMQFrame frame = 
ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), 
method.getMinor());
+//        protocolSession.writeFrame(frame);
+        if (errorCode != AMQConstant.REPLY_SUCCESS)
+        {
+            _logger.error("Channel close received with errorCode " + errorCode 
+ ", and reason " + reason);
+            if (errorCode == AMQConstant.NO_CONSUMERS)
+            {
+                throw new AMQNoConsumersException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.NO_ROUTE)
+            {
+                throw new AMQNoRouteException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.INVALID_SELECTOR)
+            {
+                _logger.debug("Broker responded with Invalid Selector.");
+
+                throw new AMQInvalidSelectorException(String.valueOf(reason));
+            }
+            else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+            {
+                _logger.debug("Broker responded with Invalid Routing Key.");
+
+                throw new 
AMQInvalidRoutingKeyException(String.valueOf(reason));
+            }
+            else
+            {
+                throw new AMQChannelClosedException(errorCode, "Error: " + 
reason);
+            }
+
+        }
+        protocolSession.channelClosed(evt.getChannelId(), errorCode, 
String.valueOf(reason));
+    }
+}
\ No newline at end of file

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
 Mon Feb 19 05:48:17 2007
@@ -27,7 +27,6 @@
 import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
 import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
 import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
 import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
 import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
 import org.apache.qpid.client.handler.BasicReturnMethodHandler;
@@ -91,7 +90,7 @@
         //
         frame2handlerMap = new HashMap();
         // Use Test Handler for Close methods to not send Close-OKs
-        frame2handlerMap.put(ChannelCloseBody.class, 
TestChannelCloseMethodHandlerNoCloseOk.getInstance());
+        frame2handlerMap.put(ChannelCloseBody.class, 
ChannelCloseMethodHandlerNoCloseOk.getInstance());
 
         frame2handlerMap.put(ChannelCloseOkBody.class, 
ChannelCloseOkMethodHandler.getInstance());
         frame2handlerMap.put(ConnectionCloseBody.class, 
ConnectionCloseMethodHandler.getInstance());


Reply via email to