Author: rupertlssmith
Date: Wed Jan 23 02:21:10 2008
New Revision: 614481

URL: http://svn.apache.org/viewvc?rev=614481&view=rev
Log:
Qpid-755, Added session per connection variant to test, to check that durable 
subscription can be picked up by a fresh connection.

Modified:
    
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Modified: 
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=614481&r1=614480&r2=614481&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
 Wed Jan 23 02:21:10 2008
@@ -21,6 +21,14 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * @todo Code to check that a consumer gets only one particular method could 
be factored into a re-usable method (as
+ *       a static on a base test helper class, e.g. TestUtils.
+ *
+ * @todo Code to create test end-points using session per connection, or all 
sessions on one connection, to be factored
+ *       out to make creating this test variation simpler. Want to make this 
variation available through LocalCircuit,
+ *       driven by the test model.
+ */
 public class DurableSubscriptionTest extends TestCase
 {
     private static final Logger _logger = 
LoggerFactory.getLogger(DurableSubscriptionTest.class);
@@ -105,9 +113,18 @@
         durabilityImpl(Session.AUTO_ACKNOWLEDGE);
     }
 
-    private void durabilityImpl(int ackMode) throws AMQException, 
JMSException, URLSyntaxException
+    public void testDurabilityNOACKSessionPerConnection() throws AMQException, 
JMSException, URLSyntaxException
     {
+        durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE);
+    }
 
+    public void testDurabilityAUTOACKSessionPerConnection() throws 
AMQException, JMSException, URLSyntaxException
+    {
+        durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    private void durabilityImpl(int ackMode) throws AMQException, 
JMSException, URLSyntaxException
+    {
         AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", 
"test", "test");
         AMQTopic topic = new AMQTopic(con, "MyTopic");
         Session session1 = con.createSession(false, ackMode);
@@ -164,6 +181,88 @@
         consumer3.close();
 
         con.close();
+    }
+
+    private void durabilityImplSessionPerConnection(int ackMode) throws 
AMQException, JMSException, URLSyntaxException
+    {
+        Message msg;
+
+        // Create producer.
+        AMQConnection con0 = new AMQConnection("vm://:1", "guest", "guest", 
"test", "test");
+        con0.start();
+        Session session0 = con0.createSession(false, ackMode);
+
+        AMQTopic topic = new AMQTopic(con0, "MyTopic");
+
+        Session sessionProd = con0.createSession(false, ackMode);
+        MessageProducer producer = sessionProd.createProducer(topic);
+
+        // Create consumer 1.
+        AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", 
"test", "test");
+        con1.start();
+        Session session1 = con1.createSession(false, ackMode);
+
+        MessageConsumer consumer1 = session0.createConsumer(topic);
+
+        // Create consumer 2.
+        AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"test", "test");
+        con2.start();
+        Session session2 = con2.createSession(false, ackMode);
+
+        TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, 
"MySubscription");
+
+        // Send message and check that both consumers get it and only it.
+        producer.send(session0.createTextMessage("A"));
+
+        msg = consumer1.receive(500);
+        assertNotNull("Message should be available", msg);
+        assertEquals("Message Text doesn't match", "A", 
((TextMessage)msg).getText());
+        msg = consumer1.receive(500);
+        assertNull("There should be no more messages for consumption on 
consumer1.", msg);
+
+        msg = consumer2.receive();
+        assertNotNull(msg);
+        assertEquals("Consumer 2 should also received the first msg.", "A", 
((TextMessage)msg).getText());
+        msg = consumer2.receive(500);
+        assertNull("There should be no more messages for consumption on 
consumer2.", msg);
+
+        // Detach the durable subscriber.
+        consumer2.close();
+        session2.close();
+        con2.close();
+
+        // Send message and receive on open consumer.
+        producer.send(session0.createTextMessage("B"));
+
+        _logger.info("Receive message on consumer 1 :expecting B");
+        msg = consumer1.receive(500);
+        assertNotNull("Consumer 1 should get message 'B'.", msg);
+        assertEquals("Incorrect Message recevied on consumer1.", "B", 
((TextMessage)msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting null");
+        msg = consumer1.receive(500);
+        assertNull("There should be no more messages for consumption on 
consumer1.", msg);
+
+        // Re-attach a new consumer to the durable subscription, and check 
that it gets the message that it missed.
+        AMQConnection con3 = new AMQConnection("vm://:1", "guest", "guest", 
"test", "test");
+        con3.start();
+        Session session3 = con3.createSession(false, ackMode);
+
+        TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, 
"MySubscription");
+
+        _logger.info("Receive message on consumer 3 :expecting B");
+        msg = consumer3.receive(500);
+        assertNotNull("Consumer 3 should get message 'B'.", msg);
+        assertEquals("Incorrect Message recevied on consumer4.", "B", 
((TextMessage)msg).getText());
+        _logger.info("Receive message on consumer 3 :expecting null");
+        msg = consumer3.receive(500);
+        assertNull("There should be no more messages for consumption on 
consumer3.", msg);
+
+        consumer1.close();
+        consumer3.close();
+
+        con0.close();
+        con1.close();
+        con3.close();
     }
 
     public static junit.framework.Test suite()


Reply via email to