Author: aidan
Date: Thu May 22 06:24:13 2008
New Revision: 659105

URL: http://svn.apache.org/viewvc?rev=659105&view=rev
Log:
QPID-1085: If an error occurs creating a durable subscriber with a selector 
delete the queue that was created.

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=659105&r1=659104&r2=659105&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu May 22 06:24:13 2008
@@ -28,8 +28,8 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -58,7 +58,6 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -80,12 +79,12 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.url.AMQBindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -927,12 +926,21 @@
         checkNotClosed();
         checkValidTopic(topic);
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, 
_connection);
-        BasicMessageConsumer consumer = (BasicMessageConsumer) 
createConsumer(dest, messageSelector, noLocal);
-        TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, 
consumer);
-        _subscriptions.put(name, subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+        try
+        {
+            BasicMessageConsumer consumer = (BasicMessageConsumer) 
createConsumer(dest, messageSelector, noLocal);
+            TopicSubscriberAdaptor subscriber = new 
TopicSubscriberAdaptor(dest, consumer);
+            _subscriptions.put(name, subscriber);
+            _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
-        return subscriber;
+            return subscriber;
+        }
+        catch (JMSException e)
+        {
+            deleteQueue(dest.getAMQQueueName());
+            throw e;
+        }
+        
     }
 
     public MapMessage createMapMessage() throws JMSException

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=659105&r1=659104&r2=659105&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Thu May 22 06:24:13 2008
@@ -130,18 +130,26 @@
     {
         checkNotClosed();
         checkValidTopic(topic);
-        if( _subscriptions.containsKey(name))
+        if (_subscriptions.containsKey(name))
         {
             _subscriptions.get(name).close();
         }
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, 
_connection);
-        BasicMessageConsumer consumer = (BasicMessageConsumer) 
createConsumer(dest, messageSelector, noLocal);
-        TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, 
consumer);
+        try
+        {
+            BasicMessageConsumer consumer = (BasicMessageConsumer) 
createConsumer(dest, messageSelector, noLocal);
+            TopicSubscriberAdaptor subscriber = new 
TopicSubscriberAdaptor(dest, consumer);
+            _subscriptions.put(name, subscriber);
+            _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
-        _subscriptions.put(name, subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+            return subscriber;
+        }
+        catch (JMSException e)
+        {
+            deleteQueue(dest.getAMQQueueName());
+            throw e;
+        }
 
-        return subscriber;
     }
 
     /**

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=659105&r1=659104&r2=659105&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
 Thu May 22 06:24:13 2008
@@ -33,6 +33,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -273,6 +276,76 @@
         con3.close();
     }
 
+    /***
+     * This tests the fix for QPID-1085
+     * Creates a durable subscriber with an invalid selector, checks that the
+     * exception is thrown correctly and that the subscription is not created. 
+     * @throws Exception 
+     */
+    public void testDurableWithInvalidSelector() throws Exception
+    {
+       Connection conn = getConnection();
+       conn.start();
+       Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+       AMQTopic topic = new AMQTopic((AMQConnection) conn, 
"MyTestDurableWithInvalidSelectorTopic");
+       MessageProducer producer = session.createProducer(topic);
+       
producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
+       try 
+       {
+               TopicSubscriber deadSubscriber = 
session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub",
+                                                                               
                                                                         "=TEST 
'test", true);
+               assertNull("Subscriber should not have been created", 
deadSubscriber);
+       } 
+       catch (InvalidSelectorException e)
+       {
+               // This was expected
+       }
+       
+       TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, 
"testDurableWithInvalidSelectorSub");
+       assertNotNull("Subscriber should have been created", liveSubscriber);
+
+       
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
+       
+       Message msg = liveSubscriber.receive();
+       assertNotNull ("Message should have been received", msg);
+       assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) 
msg).getText());
+       assertNull("Should not receive subsequent message", 
liveSubscriber.receive(200));
+    }
+    
+    /***
+     * This tests the fix for QPID-1085
+     * Creates a durable subscriber with an invalid destination, checks that 
the
+     * exception is thrown correctly and that the subscription is not created. 
+     * @throws Exception 
+     */
+    public void testDurableWithInvalidDestination() throws Exception
+    {
+       Connection conn = getConnection();
+       conn.start();
+       Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+       AMQTopic topic = new AMQTopic((AMQConnection) conn, 
"testDurableWithInvalidDestinationTopic");
+       try 
+       {
+               TopicSubscriber deadSubscriber = 
session.createDurableSubscriber(null, "testDurableWithInvalidDestinationsub");
+               assertNull("Subscriber should not have been created", 
deadSubscriber);
+       } 
+       catch (InvalidDestinationException e)
+       {
+               // This was expected
+       }
+       MessageProducer producer = session.createProducer(topic);       
+       
producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
+       
+       TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, 
"testDurableWithInvalidDestinationsub");
+       assertNotNull("Subscriber should have been created", liveSubscriber);
+       
+       
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
+       Message msg = liveSubscriber.receive();
+       assertNotNull ("Message should have been received", msg);
+       assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) 
msg).getText());
+       assertNull("Should not receive subsequent message", 
liveSubscriber.receive(200));
+    }
+    
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(DurableSubscriptionTest.class);


Reply via email to