Author: gsim
Date: Mon Jun 9 13:53:57 2008
New Revision: 665890
URL: http://svn.apache.org/viewvc?rev=665890&view=rev
Log:
Moved from AccumulatedAck to SequenceSet in managing transactional accepts
Added transactional option to perftest
Removed clientid from ConnectionSettings as it appears not to be used
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Jun 9 13:53:57 2008
@@ -294,7 +294,6 @@
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAccept.cpp \
- qpid/broker/TxAck.cpp \
qpid/broker/TxBuffer.cpp \
qpid/broker/TxPublish.cpp \
qpid/broker/Vhost.cpp \
@@ -429,7 +428,6 @@
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
qpid/broker/TxAccept.h \
- qpid/broker/TxAck.h \
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
qpid/broker/TxPublish.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Jun 9
13:53:57 2008
@@ -83,8 +83,8 @@
return id > tag;
}
-bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range)
const{
- return range->covers(id);
+bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{
+ return range->contains(id);
}
void DeliveryRecord::redeliver(SemanticState* const session) {
@@ -118,6 +118,8 @@
queue->requeue(msg);
acquired = false;
setEnded();
+ } else {
+ QPID_LOG(debug, "Ignoring release for " << id << " acquired=" <<
acquired << ", ended =" << ended);
}
}
@@ -130,6 +132,7 @@
if (acquired && !ended) {
queue->dequeue(ctxt, msg.payload);
setEnded();
+ QPID_LOG(debug, "Accepted " << id);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Jun 9
13:53:57 2008
@@ -25,7 +25,7 @@
#include <list>
#include <vector>
#include <ostream>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "Queue.h"
#include "Consumer.h"
#include "DeliveryId.h"
@@ -63,7 +63,7 @@
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
- bool coveredBy(const framing::AccumulatedAck* const range) const;
+ bool coveredBy(const framing::SequenceSet* const range) const;
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Mon Jun 9
13:53:57 2008
@@ -26,7 +26,7 @@
using std::mem_fun_ref;
using namespace qpid::broker;
-DtxAck::DtxAck(const framing::AccumulatedAck& acked,
std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>&
unacked)
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending,
pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy),
&acked)));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Mon Jun 9 13:53:57
2008
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -34,7 +34,7 @@
std::list<DeliveryRecord> pending;
public:
- DtxAck(const framing::AccumulatedAck& acked,
std::list<DeliveryRecord>& unacked);
+ DtxAck(const framing::SequenceSet& acked,
std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Jun 9
13:53:57 2008
@@ -28,7 +28,6 @@
#include "Queue.h"
#include "SessionContext.h"
#include "TxAccept.h"
-#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -63,7 +62,6 @@
prefetchCount(0),
tagGenerator("sgen"),
dtxSelected(false),
- accumulatedAck(0),
flowActive(true),
outputTasks(ss)
{
@@ -116,14 +114,12 @@
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void SemanticState::commit(MessageStore* const store, bool completeOnCommit)
+void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for
use with transactions"));
- TxOp::shared_ptr txAck(completeOnCommit ?
- static_cast<TxOp*>(new TxAck(accumulatedAck,
unacked)) :
- static_cast<TxOp*>(new TxAccept(accumulatedAck,
unacked)));
+ TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck,
unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
accumulatedAck.clear();
@@ -377,59 +373,6 @@
}
-void SemanticState::ackCumulative(DeliveryId id)
-{
- ack(id, id, true);
-}
-
-void SemanticState::ackRange(DeliveryId first, DeliveryId last)
-{
- ack(first, last, false);
-}
-
-void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
-{
- {
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(),
boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in
range)
- end = find_if(start, unacked.end(),
boost::bind(&DeliveryRecord::after, _1, last));
- } else if (start != unacked.end()) {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first,
last);
-
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, copy the relevant slice from
- //unacked and record it against that transaction:
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- //then remove that slice from the unacked record:
- unacked.remove_if(boost::bind(&DeliveryRecord::coveredBy, _1,
&accumulatedAck));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- }
- } else {
- for_each(start, end, boost::bind(&DeliveryRecord::dequeue, _1,
(TransactionContext*) 0));
- unacked.erase(start, end);
- }
- }//end of lock scope for delivery lock (TODO this is ugly, make it
prettier)
-
- //if the prefetch limit had previously been reached, or credit
- //had expired in windowing mode there may be messages that can
- //be now be delivered
- requestDispatch();
-}
-
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i !=
consumers.end(); i++) {
@@ -667,7 +610,7 @@
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
- accumulatedAck.update(first, last);//TODO convert accumulatedAck to
SequenceSet
+ accumulatedAck.add(first, last);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Jun 9
13:53:57 2008
@@ -34,7 +34,7 @@
#include "TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/shared_ptr.h"
@@ -115,7 +115,7 @@
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
DtxBufferMap suspendedXids;
- framing::AccumulatedAck accumulatedAck;
+ framing::SequenceSet accumulatedAck;
bool flowActive;
boost::shared_ptr<Exchange> cacheExchange;
sys::AggregateOutput outputTasks;
@@ -125,7 +125,6 @@
bool checkPrefetch(boost::intrusive_ptr<Message>& msg);
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
- void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
@@ -168,7 +167,7 @@
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool
ackExpected);
void startTx();
- void commit(MessageStore* const store, bool completeOnCommit);
+ void commit(MessageStore* const store);
void rollback();
void selectDtx();
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
@@ -184,10 +183,6 @@
void handle(boost::intrusive_ptr<Message> msg);
bool doOutput() { return outputTasks.doOutput(); }
- //preview only (completed == ack):
- void ackCumulative(DeliveryId deliveryTag);
- void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
-
//final 0-10 spec (completed and accepted are distinct):
void completed(DeliveryId deliveryTag, DeliveryId endTag);
void accepted(DeliveryId deliveryTag, DeliveryId endTag);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Jun 9
13:53:57 2008
@@ -464,7 +464,7 @@
void SessionAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore(), false);
+ state.commit(&getBroker().getStore());
}
void SessionAdapter::TxHandlerImpl::rollback()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Mon Jun 9
13:53:57 2008
@@ -25,9 +25,9 @@
using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
-using qpid::framing::AccumulatedAck;
+using qpid::framing::SequenceSet;
-TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>&
_unacked) :
+TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
acked(_acked), unacked(_unacked) {}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Mon Jun 9
13:53:57 2008
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -35,7 +35,7 @@
* a transactional channel.
*/
class TxAccept : public TxOp{
- framing::AccumulatedAck& acked;
+ framing::SequenceSet& acked;
std::list<DeliveryRecord>& unacked;
public:
@@ -44,7 +44,7 @@
* acks received
* @param unacked the record of delivered messages
*/
- TxAccept(framing::AccumulatedAck& acked,
std::list<DeliveryRecord>& unacked);
+ TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>&
unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Mon
Jun 9 13:53:57 2008
@@ -32,7 +32,6 @@
ConnectionSettings::ConnectionSettings() :
host("localhost"),
port(TcpAddress::DEFAULT_PORT),
- clientid("cpp"),
username("guest"),
password("guest"),
mechanism("PLAIN"),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h Mon Jun
9 13:53:57 2008
@@ -60,7 +60,6 @@
*/
std::string virtualhost;
- std::string clientid;
/**
* The username to use when authenticating the connection.
*/
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h Mon Jun 9
13:53:57 2008
@@ -38,7 +38,6 @@
("broker,b", optValue(host, "HOST"), "Broker host to connect to")
("port,p", optValue(port, "PORT"), "Broker port to connect to")
("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
- ("clientname,n", optValue(clientid, "ID"), "unique client
identifier")
("username", optValue(username, "USER"), "user name for broker log
in.")
("password", optValue(password, "PASSWORD"), "password for broker
log in.")
("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use
when authenticating.")
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon Jun 9 13:53:57 2008
@@ -59,7 +59,6 @@
SequenceNumberTest.cpp \
TimerTest.cpp \
TopicExchangeTest.cpp \
- TxAckTest.cpp \
TxBufferTest.cpp \
TxPublishTest.cpp \
MessageBuilderTest.cpp \
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Mon Jun 9 13:53:57
2008
@@ -95,8 +95,9 @@
size_t iterations;
Mode mode;
bool summary;
- uint32_t intervalSub;
- uint32_t intervalPub;
+ uint32_t intervalSub;
+ uint32_t intervalPub;
+ size_t tx;
static const std::string helpText;
@@ -106,7 +107,7 @@
pubs(1), count(500000), size(1024), confirm(true), durable(false),
uniqueData(false), syncPub(false),
subs(1), ack(0),
qt(1), iterations(1), mode(SHARED), summary(false),
- intervalSub(0), intervalPub(0)
+ intervalSub(0), intervalPub(0), tx(0)
{
addOptions()
("setup", optValue(setup), "Create shared queues.")
@@ -140,7 +141,9 @@
("queue-durable", optValue(queueDurable, "N"), "Make queue durable
(implied if durable set)")
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between
msg consume")
- ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between
msg publish");
+ ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between
msg publish")
+
+ ("tx", optValue(tx, "N"), "if non-zero, the transaction batch
size");
}
// Computed values
@@ -450,6 +453,7 @@
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ if (opts.tx) sync(session).txSelect();
SubscriptionManager subs(session);
LocalQueue lq;
subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
@@ -474,6 +478,7 @@
arg::content=msg,
arg::acceptMode=1);
}
+ if (opts.tx && (j % opts.tx == 0))
sync(session).txCommit();
if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
}
if (opts.confirm) session.sync();
@@ -483,6 +488,7 @@
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time),
"pub_done");
session.messageTransfer(arg::content=report,
arg::acceptMode=1);
+ if (opts.tx) sync(session).txCommit();
}
session.close();
}
@@ -523,16 +529,17 @@
}
void run() { // Subscribe
- try {
+ try {
+ if (opts.tx) sync(session).txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.ack));
- subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
+ LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack));
+ subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1);
subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
false);
subs.subscribe(lq, queue);
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready",
"sub_ready"), arg::acceptMode=1);
-
+ if (opts.tx) sync(session).txCommit();
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
@@ -544,6 +551,7 @@
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
+ if (opts.tx && (i % opts.tx == 0))
sync(session).txCommit();
if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
// multiple publishers. Need an array of counters,
@@ -560,14 +568,17 @@
expect = n+1;
}
}
- if (opts.ack !=0)
+ if (opts.ack)
subs.getAckPolicy().ackOutstanding(session); // Cumulative
ack for final batch.
+ if (opts.tx)
+ sync(session).txCommit();
AbsTime end=now();
// Report to publisher.
Message
result(lexical_cast<string>(opts.subQuota/secs(start,end)),
"sub_done");
session.messageTransfer(arg::content=result,
arg::acceptMode=1);
+ if (opts.tx) sync(session).txCommit();
}
session.close();
}