Author: gsim
Date: Fri Jul 25 01:23:18 2008
New Revision: 679699

URL: http://svn.apache.org/viewvc?rev=679699&view=rev
Log:
QPID-447: Patch from David Sommerseth merged from r679689.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=679699&r1=679698&r2=679699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Fri Jul 25 
01:23:18 2008
@@ -76,13 +76,15 @@
     bool transactional;
     bool durable;
     int prefetch;
+    string statusqueue;
 
     Args() : ack(0), transactional(false), durable(false), prefetch(0) {
         addOptions()
             ("ack", optValue(ack, "MODE"), "Ack frequency in messages 
(defaults to half the prefetch value)")
             ("transactional", optValue(transactional), "Use transactions")
             ("durable", optValue(durable), "subscribers should use durable 
queues")
-            ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies 
no flow control, and no acking)");
+            ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies 
no flow control, and no acking)")
+            ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message 
queue to put status messages on");
     }
 };
 
@@ -102,9 +104,6 @@
             Connection connection;
             args.open(connection);
             AsyncSession session = connection.newSession();
-            if (args.transactional) {
-                session.txSelect();
-            }
 
             //declare exchange, queue and bind them:
             session.queueDeclare(arg::queue="response");
@@ -128,6 +127,17 @@
             }
             mgr.subscribe(listener, control);
             session.sync();
+
+            if( args.statusqueue.length() > 0 ) {
+                stringstream msg_str;
+                msg_str << "topic_listener: " << (int)getpid();
+                session.messageTransfer(arg::content=Message(msg_str.str(), 
args.statusqueue));
+                cout << "Ready status put on queue '" << args.statusqueue << 
"'" << endl;
+            }
+
+            if (args.transactional) {
+                session.txSelect();
+            }
             
             cout << "topic_listener: listening..." << endl;
             mgr.run();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp?rev=679699&r1=679698&r2=679699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp Fri Jul 25 
01:23:18 2008
@@ -82,6 +82,7 @@
     int batches;
     int delay;
     int size;
+    string statusqueue;
 
     Args() : messages(1000), subscribers(1),
              transactional(false), durable(false),
@@ -94,7 +95,8 @@
             ("durable", optValue(durable), "messages should be durable")
             ("batches", optValue(batches, "N"), "how many batches to run")
             ("delay", optValue(delay, "SECONDS"), "Causes a delay between each 
batch")
-            ("size", optValue(size, "BYTES"), "size of the published 
messages");
+            ("size", optValue(size, "BYTES"), "size of the published messages")
+            ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message 
queue to read status messages from");
     }
 };
 
@@ -108,11 +110,28 @@
             Connection connection;
             args.open(connection);
             AsyncSession session = connection.newSession();
+
+            // If status-queue is defined, wait for all expected listeners to 
join in before we start
+            if( args.statusqueue.length() > 0 ) {
+                cout << "Waiting for " << args.subscribers << " listeners..." 
<< endl;
+                SubscriptionManager statusSubs(session);
+                LocalQueue statusQ;
+                statusSubs.subscribe(statusQ, args.statusqueue);
+                for (int i = 0; i < args.subscribers; i++) {
+                    Message m = statusQ.get();
+                    if( m.getData().find("topic_listener: ", 0) == 0 ) {
+                        cout << "Listener " << (i+1) << " of " << 
args.subscribers
+                             << " is ready (pid " << m.getData().substr(16, 
m.getData().length() - 16)
+                             << ")" << endl;                        
+                    } else {
+                        throw Exception(QPID_MSG("Unexpected message received 
on status queue: " << m.getData()));
+                    }                    
+                }
+            }
+
             if (args.transactional) {
                 session.txSelect();
             }
-
-
             session.queueDeclare(arg::queue="response");
             session.exchangeBind(arg::exchange="amq.direct", 
arg::queue="response", arg::bindingKey="response");
 
@@ -203,4 +222,3 @@
         session.txCommit();
     }
 }
-


Reply via email to