Author: cctrieloff
Date: Thu Oct  9 10:48:15 2008
New Revision: 703214

URL: http://svn.apache.org/viewvc?rev=703214&view=rev
Log:

Added options to perftest
- tx_sub & tx_pub so that they can be set independantly
- async tx pub option



Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=703214&r1=703213&r2=703214&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Thu Oct  9 10:48:15 
2008
@@ -97,7 +97,9 @@
     bool summary;
     uint32_t intervalSub;
     uint32_t intervalPub;
-    size_t tx;
+    size_t tx_pub;
+    bool tx_pub_async;
+    size_t tx_sub;
 
     static const std::string helpText;
     
@@ -107,7 +109,7 @@
         pubs(1), count(500000), size(1024), confirm(true), durable(false), 
uniqueData(false), syncPub(false),
         subs(1), ack(0),
         qt(1), iterations(1), mode(SHARED), summary(false),
-        intervalSub(0), intervalPub(0), tx(0)
+        intervalSub(0), intervalPub(0), tx_pub(0), tx_pub_async(false), 
tx_sub(0)
     {
         addOptions()
             ("setup", optValue(setup), "Create shared queues.")
@@ -143,7 +145,9 @@
             ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between 
msg consume")
             ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between 
msg publish")
 
-            ("tx", optValue(tx, "N"), "if non-zero, the transaction batch 
size");
+            ("tx_pub", optValue(tx_pub, "N"), "if non-zero, the transaction 
batch size for publishing")
+            ("tx_pub_async", optValue(tx_pub_async, "yes|no"), "Publishing tx 
commit async")
+            ("tx_sub", optValue(tx_sub, "N"), "if non-zero, the transaction 
batch size for consuming");
     }
 
     // Computed values
@@ -453,7 +457,13 @@
                 
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
 
 
-            if (opts.tx) sync(session).txSelect();
+            if (opts.tx_pub){ 
+                if (opts.tx_pub_async){
+                    session.txSelect();
+                } else {
+                    sync(session).txSelect();
+                }
+            }
             SubscriptionManager subs(session);
             LocalQueue lq;
             subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); 
@@ -478,7 +488,13 @@
                             arg::content=msg,
                             arg::acceptMode=1);
                     }
-                    if (opts.tx && ((i+1) % opts.tx == 0)) 
sync(session).txCommit();
+                    if (opts.tx_pub && ((i+1) % opts.tx_pub == 0)){
+                        if (opts.tx_pub_async){
+                            session.txCommit();
+                        } else {
+                            sync(session).txCommit();
+                        }
+                    }
                     if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
                 }
                 if (opts.confirm) session.sync();
@@ -488,7 +504,13 @@
                 // Send result to controller.
                 Message report(lexical_cast<string>(opts.count/time), 
"pub_done");
                 session.messageTransfer(arg::content=report, 
arg::acceptMode=1);
-                if (opts.tx) sync(session).txCommit();
+                if (opts.tx_pub){
+                    if (opts.tx_pub_async){
+                        session.txCommit();
+                    }else{
+                        sync(session).txCommit();
+                    }
+                }
             }
             session.close();
         }
@@ -530,16 +552,16 @@
 
     void run() {                // Subscribe
         try {            
-            if (opts.tx) sync(session).txSelect();
+            if (opts.tx_sub) sync(session).txSelect();
             SubscriptionManager subs(session);
-            LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack));
-            subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1);
+            LocalQueue lq(AckPolicy(opts.tx_sub ? opts.tx_sub : opts.ack));
+            subs.setAcceptMode(opts.tx_sub || opts.ack ? 0 : 1);
             subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
                                 false);
             subs.subscribe(lq, queue);
             // Notify controller we are ready.
             session.messageTransfer(arg::content=Message("ready", 
"sub_ready"), arg::acceptMode=1);
-            if (opts.tx) sync(session).txCommit();
+            if (opts.tx_sub) sync(session).txCommit();
             
             for (size_t j = 0; j < opts.iterations; ++j) {
                 if (j > 0) {
@@ -551,7 +573,7 @@
                 size_t expect=0;
                 for (size_t i = 0; i < opts.subQuota; ++i) {
                     msg=lq.pop();
-                    if (opts.tx && ((i+1) % opts.tx == 0)) 
sync(session).txCommit();
+                    if (opts.tx_sub && ((i+1) % opts.tx_sub == 0)) 
sync(session).txCommit();
                     if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
                     // TODO aconway 2007-11-23: check message order for. 
                     // multiple publishers. Need an array of counters,
@@ -568,9 +590,9 @@
                         expect = n+1;
                     }
                 }
-                if (opts.tx || opts.ack)
+                if (opts.tx_sub || opts.ack)
                     lq.getAckPolicy().ackOutstanding(session); // Cumulative 
ack for final batch.
-                if (opts.tx)
+                if (opts.tx_sub)
                     sync(session).txCommit();
                 AbsTime end=now();
 
@@ -578,7 +600,7 @@
                 Message 
result(lexical_cast<string>(opts.subQuota/secs(start,end)),
                                "sub_done");
                 session.messageTransfer(arg::content=result, 
arg::acceptMode=1);
-                if (opts.tx) sync(session).txCommit();
+                if (opts.tx_sub) sync(session).txCommit();
             }
             session.close();
         }


Reply via email to