Author: aidan
Date: Thu May 22 06:24:13 2008
New Revision: 659105
URL: http://svn.apache.org/viewvc?rev=659105&view=rev
Log:
QPID-1085: If an error occurs creating a durable subscriber with a selector
delete the queue that was created.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=659105&r1=659104&r2=659105&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu May 22 06:24:13 2008
@@ -28,8 +28,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,7 +58,6 @@
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -80,12 +79,12 @@
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -927,12 +926,21 @@
checkNotClosed();
checkValidTopic(topic);
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name,
_connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest,
consumer);
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ try
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new
TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
- return subscriber;
+ return subscriber;
+ }
+ catch (JMSException e)
+ {
+ deleteQueue(dest.getAMQQueueName());
+ throw e;
+ }
+
}
public MapMessage createMapMessage() throws JMSException
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=659105&r1=659104&r2=659105&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Thu May 22 06:24:13 2008
@@ -130,18 +130,26 @@
{
checkNotClosed();
checkValidTopic(topic);
- if( _subscriptions.containsKey(name))
+ if (_subscriptions.containsKey(name))
{
_subscriptions.get(name).close();
}
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name,
_connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest,
consumer);
+ try
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new
TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ return subscriber;
+ }
+ catch (JMSException e)
+ {
+ deleteQueue(dest.getAMQQueueName());
+ throw e;
+ }
- return subscriber;
}
/**
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=659105&r1=659104&r2=659105&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Thu May 22 06:24:13 2008
@@ -33,6 +33,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Connection;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -273,6 +276,76 @@
con3.close();
}
+ /***
+ * This tests the fix for QPID-1085
+ * Creates a durable subscriber with an invalid selector, checks that the
+ * exception is thrown correctly and that the subscription is not created.
+ * @throws Exception
+ */
+ public void testDurableWithInvalidSelector() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn,
"MyTestDurableWithInvalidSelectorTopic");
+ MessageProducer producer = session.createProducer(topic);
+
producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
+ try
+ {
+ TopicSubscriber deadSubscriber =
session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub",
+
"=TEST
'test", true);
+ assertNull("Subscriber should not have been created",
deadSubscriber);
+ }
+ catch (InvalidSelectorException e)
+ {
+ // This was expected
+ }
+
+ TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic,
"testDurableWithInvalidSelectorSub");
+ assertNotNull("Subscriber should have been created", liveSubscriber);
+
+
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
+
+ Message msg = liveSubscriber.receive();
+ assertNotNull ("Message should have been received", msg);
+ assertEquals ("testDurableWithInvalidSelector2", ((TextMessage)
msg).getText());
+ assertNull("Should not receive subsequent message",
liveSubscriber.receive(200));
+ }
+
+ /***
+ * This tests the fix for QPID-1085
+ * Creates a durable subscriber with an invalid destination, checks that
the
+ * exception is thrown correctly and that the subscription is not created.
+ * @throws Exception
+ */
+ public void testDurableWithInvalidDestination() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn,
"testDurableWithInvalidDestinationTopic");
+ try
+ {
+ TopicSubscriber deadSubscriber =
session.createDurableSubscriber(null, "testDurableWithInvalidDestinationsub");
+ assertNull("Subscriber should not have been created",
deadSubscriber);
+ }
+ catch (InvalidDestinationException e)
+ {
+ // This was expected
+ }
+ MessageProducer producer = session.createProducer(topic);
+
producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
+
+ TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic,
"testDurableWithInvalidDestinationsub");
+ assertNotNull("Subscriber should have been created", liveSubscriber);
+
+
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
+ Message msg = liveSubscriber.receive();
+ assertNotNull ("Message should have been received", msg);
+ assertEquals ("testDurableWithInvalidSelector2", ((TextMessage)
msg).getText());
+ assertNull("Should not receive subsequent message",
liveSubscriber.receive(200));
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DurableSubscriptionTest.class);