Author: gsim
Date: Mon Dec 3 12:12:16 2007
New Revision: 600655
URL: http://svn.apache.org/viewvc?rev=600655&view=rev
Log:
Add option for durability to topic test
Modified:
incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=600655&r1=600654&r2=600655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Mon Dec 3
12:12:16 2007
@@ -74,11 +74,14 @@
struct Args : public qpid::TestOptions {
int ack;
bool transactional;
+ bool durable;
int prefetch;
- Args() : ack(0), transactional(false), prefetch(0) {
+
+ Args() : ack(0), transactional(false), durable(false), prefetch(0) {
addOptions()
("ack", optValue(ack, "MODE"), "Ack frequency in messages
(defaults to half the prefetch value)")
("transactional", optValue(transactional), "Use transactions")
+ ("durable", optValue(durable), "subscribers should use durable
queues")
("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies
no flow control, and no acking)");
}
};
@@ -107,7 +110,7 @@
//declare exchange, queue and bind them:
session.queueDeclare(arg::queue="response");
std::string control = "control_" + session.getId().str();
- session.queueDeclare(arg::queue=control);
+ session.queueDeclare(arg::queue=control,
arg::durable=args.durable);
session.queueBind(arg::exchange="amq.topic", arg::queue=control,
arg::routingKey="topic_control");
//set up listener
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp?rev=600655&r1=600654&r2=600655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp Mon Dec 3
12:12:16 2007
@@ -59,6 +59,7 @@
Session_0_10& session;
const string controlTopic;
const bool transactional;
+ const bool durable;
Monitor monitor;
int count;
@@ -66,7 +67,7 @@
string generateData(int size);
public:
- Publisher(Session_0_10& session, const string& controlTopic, bool tx);
+ Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool
durable);
virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
@@ -78,23 +79,21 @@
struct Args : public TestOptions {
int messages;
int subscribers;
- int ack;
bool transactional;
- int prefetch;
+ bool durable;
int batches;
int delay;
int size;
Args() : messages(1000), subscribers(1),
- ack(500), transactional(false), prefetch(1000),
+ transactional(false), durable(false),
batches(1), delay(0), size(256)
{
addOptions()
("messages", optValue(messages, "N"), "how many messages to send")
("subscribers", optValue(subscribers, "N"), "how many subscribers
to expect reports from")
- ("ack", optValue(ack, "MODE"), "Acknowledgement mode:0=NO_ACK,
1=AUTO_ACK, 2=LAZY_ACK")
("transactional", optValue(transactional), "client should use
transactions")
- ("prefetch", optValue(prefetch, "N"), "prefetch count")
+ ("durable", optValue(durable), "messages should be durable")
("batches", optValue(batches, "N"), "how many batches to run")
("delay", optValue(delay, "SECONDS"), "Causes a delay between each
batch")
("size", optValue(size, "BYTES"), "size of the published
messages");
@@ -121,7 +120,7 @@
//set up listener
SubscriptionManager mgr(session);
- Publisher publisher(session, "topic_control", args.transactional);
+ Publisher publisher(session, "topic_control", args.transactional,
args.durable);
mgr.subscribe(publisher, "response");
mgr.start();
@@ -158,8 +157,8 @@
return 1;
}
-Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool
tx) :
- session(_session), controlTopic(_controlTopic), transactional(tx){}
+Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool
tx, bool d) :
+ session(_session), controlTopic(_controlTopic), transactional(tx),
durable(d), count(0) {}
void Publisher::received(Message& ){
//count responses and when all are received end the current batch
@@ -176,6 +175,9 @@
int64_t Publisher::publish(int msgs, int listeners, int size){
Message msg(generateData(size), controlTopic);
+ if (durable) {
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ }
AbsTime start = now();
{
Monitor::ScopedLock l(monitor);