Author: cctrieloff
Date: Thu Oct 9 10:48:15 2008
New Revision: 703214
URL: http://svn.apache.org/viewvc?rev=703214&view=rev
Log:
Added options to perftest
- tx_sub & tx_pub so that they can be set independantly
- async tx pub option
Modified:
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=703214&r1=703213&r2=703214&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Thu Oct 9 10:48:15
2008
@@ -97,7 +97,9 @@
bool summary;
uint32_t intervalSub;
uint32_t intervalPub;
- size_t tx;
+ size_t tx_pub;
+ bool tx_pub_async;
+ size_t tx_sub;
static const std::string helpText;
@@ -107,7 +109,7 @@
pubs(1), count(500000), size(1024), confirm(true), durable(false),
uniqueData(false), syncPub(false),
subs(1), ack(0),
qt(1), iterations(1), mode(SHARED), summary(false),
- intervalSub(0), intervalPub(0), tx(0)
+ intervalSub(0), intervalPub(0), tx_pub(0), tx_pub_async(false),
tx_sub(0)
{
addOptions()
("setup", optValue(setup), "Create shared queues.")
@@ -143,7 +145,9 @@
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between
msg consume")
("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between
msg publish")
- ("tx", optValue(tx, "N"), "if non-zero, the transaction batch
size");
+ ("tx_pub", optValue(tx_pub, "N"), "if non-zero, the transaction
batch size for publishing")
+ ("tx_pub_async", optValue(tx_pub_async, "yes|no"), "Publishing tx
commit async")
+ ("tx_sub", optValue(tx_sub, "N"), "if non-zero, the transaction
batch size for consuming");
}
// Computed values
@@ -453,7 +457,13 @@
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
- if (opts.tx) sync(session).txSelect();
+ if (opts.tx_pub){
+ if (opts.tx_pub_async){
+ session.txSelect();
+ } else {
+ sync(session).txSelect();
+ }
+ }
SubscriptionManager subs(session);
LocalQueue lq;
subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
@@ -478,7 +488,13 @@
arg::content=msg,
arg::acceptMode=1);
}
- if (opts.tx && ((i+1) % opts.tx == 0))
sync(session).txCommit();
+ if (opts.tx_pub && ((i+1) % opts.tx_pub == 0)){
+ if (opts.tx_pub_async){
+ session.txCommit();
+ } else {
+ sync(session).txCommit();
+ }
+ }
if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
}
if (opts.confirm) session.sync();
@@ -488,7 +504,13 @@
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time),
"pub_done");
session.messageTransfer(arg::content=report,
arg::acceptMode=1);
- if (opts.tx) sync(session).txCommit();
+ if (opts.tx_pub){
+ if (opts.tx_pub_async){
+ session.txCommit();
+ }else{
+ sync(session).txCommit();
+ }
+ }
}
session.close();
}
@@ -530,16 +552,16 @@
void run() { // Subscribe
try {
- if (opts.tx) sync(session).txSelect();
+ if (opts.tx_sub) sync(session).txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack));
- subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1);
+ LocalQueue lq(AckPolicy(opts.tx_sub ? opts.tx_sub : opts.ack));
+ subs.setAcceptMode(opts.tx_sub || opts.ack ? 0 : 1);
subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
false);
subs.subscribe(lq, queue);
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready",
"sub_ready"), arg::acceptMode=1);
- if (opts.tx) sync(session).txCommit();
+ if (opts.tx_sub) sync(session).txCommit();
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
@@ -551,7 +573,7 @@
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
- if (opts.tx && ((i+1) % opts.tx == 0))
sync(session).txCommit();
+ if (opts.tx_sub && ((i+1) % opts.tx_sub == 0))
sync(session).txCommit();
if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
// multiple publishers. Need an array of counters,
@@ -568,9 +590,9 @@
expect = n+1;
}
}
- if (opts.tx || opts.ack)
+ if (opts.tx_sub || opts.ack)
lq.getAckPolicy().ackOutstanding(session); // Cumulative
ack for final batch.
- if (opts.tx)
+ if (opts.tx_sub)
sync(session).txCommit();
AbsTime end=now();
@@ -578,7 +600,7 @@
Message
result(lexical_cast<string>(opts.subQuota/secs(start,end)),
"sub_done");
session.messageTransfer(arg::content=result,
arg::acceptMode=1);
- if (opts.tx) sync(session).txCommit();
+ if (opts.tx_sub) sync(session).txCommit();
}
session.close();
}