Author: gsim
Date: Wed Jan 16 07:18:34 2008
New Revision: 612478

URL: http://svn.apache.org/viewvc?rev=612478&view=rev
Log:
Add option to perftest to run for n iterations and print averages of all 
reported rates.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=612478&r1=612477&r2=612478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Jan 16 07:18:34 
2008
@@ -87,6 +87,7 @@
 
     // General
     size_t qt;
+    size_t iterations;
     Mode mode;
     bool summary;
 
@@ -97,7 +98,7 @@
         setup(false), control(false), publish(false), subscribe(false),
         pubs(1), count(500000), size(1024), confirm(true), durable(false), 
uniqueData(false),
         subs(1), ack(0),
-        qt(1), mode(SHARED), summary(false)
+        qt(1), iterations(1), mode(SHARED), summary(false)
     {
         addOptions()
             ("setup", optValue(setup), "Create shared queues.")
@@ -122,6 +123,7 @@
              "N==0: Subscriber uses unconfirmed mode")
             
             ("qt", optValue(qt, "N"), "Create N queues or topics.")
+            ("iterations", optValue(iterations, "N"), "Desired number of 
iterations of the test.")
             ("summary,s", optValue(summary), "Summary output: pubs/sec 
subs/sec transfers/sec Mbytes/sec")
 
             ("queue_max_count", optValue(queueMaxCount, "N"), "queue policy: 
count to trigger 'flow to disk'")
@@ -303,6 +305,20 @@
         if (!opts.summary) cout << " done." << endl;
     }
 
+    void process(size_t n, LocalQueue lq, string queue,
+                 boost::function<void (const string&)> msgFn)
+    {
+        session.messageFlow(queue, 0, n); 
+        if (!opts.summary) 
+            cout << "Processing " << n << " messages from "
+                 << queue << " " << flush;
+        for (size_t i = 0; i < n; ++i) {
+            if (!opts.summary) cout << "." << flush;
+            msgFn(lq.pop().getData());
+        }
+        if (!opts.summary) cout << " done." << endl;
+    }
+
     void send(size_t n, string queue, string data) {
         if (!opts.summary)
             cout << "Sending " << data << " " << n << " times to " << queue
@@ -317,35 +333,63 @@
             // Wait for subscribers to be ready.
             process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready"));
 
-            Stats pubRates;
-            Stats subRates;
+            LocalQueue pubDone;
+            LocalQueue subDone;
+            subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false);
+            subs.subscribe(pubDone, "pub_done");
+            subs.subscribe(subDone, "sub_done");
+
+            double txrateTotal(0);
+            double mbytesTotal(0);
+            double pubRateTotal(0);
+            double subRateTotal(0);
+
+            for (size_t j = 0; j < opts.iterations; ++j) {
+                AbsTime start=now();
+                send(opts.totalPubs, "pub_start", "start"); // Start publishers
+
+                Stats pubRates;
+                Stats subRates;
+
+                process(opts.totalPubs, pubDone, "pub_done", 
boost::ref(pubRates));
+                process(opts.totalSubs, subDone, "sub_done", 
boost::ref(subRates));
+
+                AbsTime end=now(); 
+
+                double time=secs(start, end);
+                double txrate=opts.transfers/time;
+                double mbytes=(txrate*opts.size)/(1024*1024);
+                
+                if (!opts.summary) {
+                    cout << endl << "Total " << opts.transfers << " transfers 
of "
+                         << opts.size << " bytes in "
+                         << time << " seconds." << endl;
+                    cout << endl << "Publish transfers/sec:    " << endl;
+                    pubRates.print(cout);
+                    cout << endl << "Subscribe transfers/sec:  " << endl;
+                    subRates.print(cout);
+                    cout << endl
+                         << "Total transfers/sec:      " << txrate << endl
+                         << "Total Mbytes/sec: " << mbytes << endl;
+                }
+                else {
+                    cout << pubRates.mean() << "\t"
+                         << subRates.mean() << "\t"
+                         << txrate << "\t"
+                         << mbytes << endl;
+                }
 
-            AbsTime start=now();
-            send(opts.totalPubs, "pub_start", "start"); // Start publishers
-            process(opts.totalPubs, "pub_done", boost::ref(pubRates));
-            process(opts.totalSubs, "sub_done", boost::ref(subRates));
-            AbsTime end=now(); 
-            double time=secs(start, end);
-            double txrate=opts.transfers/time;
-            double mbytes=(txrate*opts.size)/(1024*1024);
-            
-            if (!opts.summary) {
-                cout << endl << "Total " << opts.transfers << " transfers of "
-                     << opts.size << " bytes in "
-                     << time << " seconds." << endl;
-                cout << endl << "Publish transfers/sec:    " << endl;
-                pubRates.print(cout);
-                cout << endl << "Subscribe transfers/sec:  " << endl;
-                subRates.print(cout);
-                cout << endl
-                     << "Total transfers/sec:      " << txrate << endl
-                     << "Total Mbytes/sec: " << mbytes << endl;
+                txrateTotal += txrate;
+                mbytesTotal += mbytes;
+                pubRateTotal += pubRates.mean();
+                subRateTotal += subRates.mean();
             }
-            else {
-                cout << pubRates.mean() << "\t"
-                     << subRates.mean() << "\t"
-                     << txrate << "\t"
-                     << mbytes << endl;
+            if (opts.iterations > 1) {
+                cout << "Averages: "<< endl
+                     << (pubRateTotal / opts.iterations) << "\t"
+                     << (subRateTotal / opts.iterations) << "\t"
+                     << (txrateTotal / opts.iterations) << "\t"
+                     << (mbytesTotal / opts.iterations) << endl;
             }
         }
         catch (const std::exception& e) {
@@ -394,29 +438,31 @@
 
 
             SubscriptionManager subs(session);
-            LocalQueue lq(AckPolicy(opts.ack));
-            subs.setFlowControl(1, SubscriptionManager::UNLIMITED, false); 
+            LocalQueue lq;
+            subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); 
             subs.subscribe(lq, "pub_start"); 
-            expect(lq.pop().getData(), "start");
             
-            AbsTime start=now();
-            for (size_t i=0; i<opts.count; i++) {
-                // Stamp the iteration into the message data, avoid
-                // any heap allocation.
-                const_cast<std::string&>(msg.getData()).replace(offset, 
sizeof(uint32_t), reinterpret_cast<const char*>(&i), sizeof(uint32_t));
-                completion = session.messageTransfer(
-                    arg::destination=destination,
-                    arg::content=msg,
-                    arg::confirmMode=opts.confirm);
+            for (size_t j = 0; j < opts.iterations; ++j) {
+                expect(lq.pop().getData(), "start");
+                AbsTime start=now();
+                for (size_t i=0; i<opts.count; i++) {
+                    // Stamp the iteration into the message data, avoid
+                    // any heap allocation.
+                    const_cast<std::string&>(msg.getData()).replace(offset, 
sizeof(uint32_t), 
+                                                                    
reinterpret_cast<const char*>(&i), sizeof(uint32_t));
+                    completion = session.messageTransfer(
+                        arg::destination=destination,
+                        arg::content=msg,
+                        arg::confirmMode=opts.confirm);
+                }
+                if (opts.confirm) completion.sync();
+                AbsTime end=now();
+                double time=secs(start,end);
+                
+                // Send result to controller.
+                Message report(lexical_cast<string>(opts.count/time), 
"pub_done");
+                session.messageTransfer(arg::content=report);
             }
-            if (opts.confirm) completion.sync();
-            AbsTime end=now();
-            double time=secs(start,end);
-
-            // Send result to controller.
-            msg.setData(lexical_cast<string>(opts.count/time));
-            msg.getDeliveryProperties().setRoutingKey("pub_done");
-            session.messageTransfer(arg::content=msg);
             session.close();
         }
         catch (const std::exception& e) {
@@ -466,34 +512,41 @@
             // Notify controller we are ready.
             session.messageTransfer(arg::content=Message("ready", 
"sub_ready"));
 
-            Message msg;
-            AbsTime start=now();
-            size_t expect=0;
-            for (size_t i = 0; i < opts.subQuota; ++i) {
-                msg=lq.pop();
-                // TODO aconway 2007-11-23: check message order for. 
-                // multiple publishers. Need an acorray of counters,
-                // one per publisher and a publisher ID in the
-                // message. Careful not to introduce a lot of overhead
-                // here, e.g. no std::map, std::string etc.
-                //
-                // For now verify order only for a single publisher.
-                size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0;
-                size_t n = *reinterpret_cast<const 
uint32_t*>(msg.getData().data() + offset);
-                if (opts.pubs == 1) {
-                    if (opts.subs == 1 || opts.mode == FANOUT) 
verify(n==expect, "==", expect, n);
-                    else verify(n>=expect, ">=", expect, n);
-                    expect = n+1;
+            
+            for (size_t j = 0; j < opts.iterations; ++j) {
+                if (j > 0) {
+                    //need to allocate some more credit
+                    session.messageFlow(queue, 0, opts.subQuota); 
+                }
+                Message msg;
+                AbsTime start=now();
+                size_t expect=0;
+                for (size_t i = 0; i < opts.subQuota; ++i) {
+                    msg=lq.pop();
+                    // TODO aconway 2007-11-23: check message order for. 
+                    // multiple publishers. Need an acorray of counters,
+                    // one per publisher and a publisher ID in the
+                    // message. Careful not to introduce a lot of overhead
+                    // here, e.g. no std::map, std::string etc.
+                    //
+                    // For now verify order only for a single publisher.
+                    size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ 
: 0;
+                    size_t n = *reinterpret_cast<const 
uint32_t*>(msg.getData().data() + offset);
+                    if (opts.pubs == 1) {
+                        if (opts.subs == 1 || opts.mode == FANOUT) 
verify(n==expect, "==", expect, n);
+                        else verify(n>=expect, ">=", expect, n);
+                        expect = n+1;
+                    }
                 }
+                if (opts.ack !=0)
+                    msg.acknowledge(); // Cumulative ack for final batch.
+                AbsTime end=now();
+
+                // Report to publisher.
+                Message 
result(lexical_cast<string>(opts.subQuota/secs(start,end)),
+                               "sub_done");
+                session.messageTransfer(arg::content=result);
             }
-            if (opts.ack !=0)
-                msg.acknowledge(); // Cumulative ack for final batch.
-            AbsTime end=now();
-
-            // Report to publisher.
-            Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
-                           "sub_done");
-            session.messageTransfer(arg::content=result);
             session.close();
         }
         catch (const std::exception& e) {


Reply via email to