Author: aconway
Date: Thu Oct 16 13:15:03 2008
New Revision: 705347

URL: http://svn.apache.org/viewvc?rev=705347&view=rev
Log:
Extended dump consumer test to cover member death.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=705347&r1=705346&r2=705347&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Thu Oct 16 13:15:03 
2008
@@ -58,10 +58,10 @@
         }
     }
 
-    void kill() {
+    void kill(int sig=SIGINT) {
         using qpid::ErrnoException;
         if (pid == 0) return;
-        if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed");
+        if (::kill(pid, sig) < 0) throw ErrnoException("kill failed");
         int status;
         if (::waitpid(pid, &status, 0) < 0) throw ErrnoException("wait for 
forked process failed");
         if (WEXITSTATUS(status) != 0)

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=705347&r1=705346&r2=705347&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Oct 16 
13:15:03 2008
@@ -92,8 +92,8 @@
     void add0(bool force);
     void setup();
 
-    void kill(size_t n) {
-        if (n) forkedBrokers[n-1].kill();
+    void kill(size_t n, int sig=SIGINT) {
+        if (n) forkedBrokers[n-1].kill(sig);
         else broker0->broker->shutdown();
     }
 
@@ -218,7 +218,7 @@
     BOOST_CHECK_EQUAL(kb2,kb0);
     BOOST_CHECK_EQUAL(kb2,kb1);
 
-    cluster.kill(1);
+    cluster.kill(1,9);
     kb0 = knownBrokerPorts(c0.connection, 2);
     kb2 = knownBrokerPorts(c2.connection, 2);
     BOOST_CHECK_EQUAL(kb0.size(), 2);
@@ -226,34 +226,49 @@
 }
 
 QPID_AUTO_TEST_CASE(DumpConsumers) {
-    ClusterFixture cluster(1); 
-    Client c0(cluster[0], "c0"); 
-    c0.session.queueDeclare("q");
-    c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
-    c0.session.sync();
+    ClusterFixture cluster(1, false); // Don't init broker 0
 
-    // Start new members
     cluster.add();
     Client c1(cluster[1], "c1"); 
+    c1.session.queueDeclare("q");
+    c1.subs.subscribe(c1.lq, "q", FlowControl::zero());
+    c1.session.sync();
+
+    // Start new members
+    cluster.add0(true);
+    Client c0(cluster[0], "c0"); 
     cluster.add();
     Client c2(cluster[2], "c2"); 
 
     // Transfer a message, verify all members see it.
-    c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+    c1.session.messageTransfer(arg::content=Message("aaa", "q"));
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
 
-
     // Activate the subscription, ensure message removed on all queues. 
-    c0.subs.setFlowControl("q", FlowControl::messageCredit(1));
+    c1.subs.setFlowControl("q", FlowControl::unlimited());
     Message m;
-    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+    BOOST_CHECK(c1.lq.get(m, TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "aaa");
 
-    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
+
+    // Kill the subscribing member, ensure further messages are not removed.
+    {
+        ScopedSuppressLogging sl;
+        cluster.kill(1,9);
+        cluster.waitFor(2);
+        try { c1.connection.close(); }
+        catch (...) {}
+    }
+    for (int i = 0; i < 10; ++i) {
+        c0.session.messageTransfer(arg::content=Message("bbb", "q"));
+        BOOST_REQUIRE(c0.subs.get(m, "q", TIME_SEC));
+        BOOST_REQUIRE_EQUAL(m.getData(), "bbb");
+    }
 }
 
 QPID_AUTO_TEST_CASE(testCatchupSharedState) {


Reply via email to