Author: gsim
Date: Mon Jun  9 13:53:57 2008
New Revision: 665890

URL: http://svn.apache.org/viewvc?rev=665890&view=rev
Log:
Moved from AccumulatedAck to SequenceSet in managing transactional accepts
Added transactional option to perftest
Removed clientid from ConnectionSettings as it appears not to be used


Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
    incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Jun  9 13:53:57 2008
@@ -294,7 +294,6 @@
   qpid/broker/Timer.cpp \
   qpid/broker/TopicExchange.cpp \
   qpid/broker/TxAccept.cpp \
-  qpid/broker/TxAck.cpp \
   qpid/broker/TxBuffer.cpp \
   qpid/broker/TxPublish.cpp \
   qpid/broker/Vhost.cpp \
@@ -429,7 +428,6 @@
   qpid/broker/TopicExchange.h \
   qpid/broker/TransactionalStore.h \
   qpid/broker/TxAccept.h \
-  qpid/broker/TxAck.h \
   qpid/broker/TxBuffer.h \
   qpid/broker/TxOp.h \
   qpid/broker/TxPublish.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Jun  9 
13:53:57 2008
@@ -83,8 +83,8 @@
     return id > tag;
 }
 
-bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) 
const{
-    return range->covers(id);
+bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{
+    return range->contains(id);
 }
 
 void DeliveryRecord::redeliver(SemanticState* const session) {
@@ -118,6 +118,8 @@
         queue->requeue(msg);
         acquired = false;
         setEnded();
+    } else {
+        QPID_LOG(debug, "Ignoring release for " << id << " acquired=" << 
acquired << ", ended =" << ended);
     }
 }
 
@@ -130,6 +132,7 @@
     if (acquired && !ended) {
         queue->dequeue(ctxt, msg.payload);
         setEnded();
+        QPID_LOG(debug, "Accepted " << id);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Jun  9 
13:53:57 2008
@@ -25,7 +25,7 @@
 #include <list>
 #include <vector>
 #include <ostream>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
 #include "Queue.h"
 #include "Consumer.h"
 #include "DeliveryId.h"
@@ -63,7 +63,7 @@
     bool matches(DeliveryId tag) const;
     bool matchOrAfter(DeliveryId tag) const;
     bool after(DeliveryId tag) const;
-    bool coveredBy(const framing::AccumulatedAck* const range) const;
+    bool coveredBy(const framing::SequenceSet* const range) const;
 
     void dequeue(TransactionContext* ctxt = 0) const;
     void requeue() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Mon Jun  9 
13:53:57 2008
@@ -26,7 +26,7 @@
 using std::mem_fun_ref;
 using namespace qpid::broker;
 
-DtxAck::DtxAck(const framing::AccumulatedAck& acked, 
std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& 
unacked)
 {
     remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, 
pending.end()), 
                    not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), 
&acked)));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Mon Jun  9 13:53:57 
2008
@@ -24,7 +24,7 @@
 #include <algorithm>
 #include <functional>
 #include <list>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
 #include "DeliveryRecord.h"
 #include "TxOp.h"
 
@@ -34,7 +34,7 @@
             std::list<DeliveryRecord> pending;
 
         public:
-            DtxAck(const framing::AccumulatedAck& acked, 
std::list<DeliveryRecord>& unacked);
+            DtxAck(const framing::SequenceSet& acked, 
std::list<DeliveryRecord>& unacked);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Jun  9 
13:53:57 2008
@@ -28,7 +28,6 @@
 #include "Queue.h"
 #include "SessionContext.h"
 #include "TxAccept.h"
-#include "TxAck.h"
 #include "TxPublish.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/MessageTransferBody.h"
@@ -63,7 +62,6 @@
       prefetchCount(0),
       tagGenerator("sgen"),
       dtxSelected(false),
-      accumulatedAck(0),
       flowActive(true),
       outputTasks(ss)
 {
@@ -116,14 +114,12 @@
     txBuffer = TxBuffer::shared_ptr(new TxBuffer());
 }
 
-void SemanticState::commit(MessageStore* const store, bool completeOnCommit)
+void SemanticState::commit(MessageStore* const store)
 {
     if (!txBuffer) throw
         CommandInvalidException(QPID_MSG("Session has not been selected for 
use with transactions"));
 
-    TxOp::shared_ptr txAck(completeOnCommit ? 
-                           static_cast<TxOp*>(new TxAck(accumulatedAck, 
unacked)) : 
-                           static_cast<TxOp*>(new TxAccept(accumulatedAck, 
unacked)));
+    TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, 
unacked)));
     txBuffer->enlist(txAck);
     if (txBuffer->commitLocal(store)) {
         accumulatedAck.clear();
@@ -377,59 +373,6 @@
 
 }
 
-void SemanticState::ackCumulative(DeliveryId id)
-{
-    ack(id, id, true);
-}
-
-void SemanticState::ackRange(DeliveryId first, DeliveryId last)
-{
-    ack(first, last, false);
-}
-
-void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
-{
-    {
-        ack_iterator start = cumulative ? unacked.begin() : 
-            find_if(unacked.begin(), unacked.end(), 
boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
-        ack_iterator end = start;
-        
-        if (cumulative || first != last) {
-            //need to find end (position it just after the last record in 
range)
-            end = find_if(start, unacked.end(), 
boost::bind(&DeliveryRecord::after, _1, last));
-        } else if (start != unacked.end()) {
-            //just acked single element (move end past it)
-            ++end;
-        }
-        
-        for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
-        
-        if (txBuffer.get()) {
-            //in transactional mode, don't dequeue or remove, just
-            //maintain set of acknowledged messages:
-            accumulatedAck.update(cumulative ? accumulatedAck.mark : first, 
last);
-            
-            if (dtxBuffer.get()) {
-                //if enlisted in a dtx, copy the relevant slice from
-                //unacked and record it against that transaction:
-                TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
-                //then remove that slice from the unacked record:
-                unacked.remove_if(boost::bind(&DeliveryRecord::coveredBy, _1, 
&accumulatedAck));
-                accumulatedAck.clear();
-                dtxBuffer->enlist(txAck);    
-            }
-        } else {
-            for_each(start, end, boost::bind(&DeliveryRecord::dequeue, _1, 
(TransactionContext*) 0));
-            unacked.erase(start, end);
-        }
-    }//end of lock scope for delivery lock (TODO this is ugly, make it 
prettier)
-    
-    //if the prefetch limit had previously been reached, or credit
-    //had expired in windowing mode there may be messages that can
-    //be now be delivered
-    requestDispatch();
-}
-
 void SemanticState::requestDispatch()
 {    
     for (ConsumerImplMap::iterator i = consumers.begin(); i != 
consumers.end(); i++) {
@@ -667,7 +610,7 @@
     if (txBuffer.get()) {
         //in transactional mode, don't dequeue or remove, just
         //maintain set of acknowledged messages:
-        accumulatedAck.update(first, last);//TODO convert accumulatedAck to 
SequenceSet
+        accumulatedAck.add(first, last);
         
         if (dtxBuffer.get()) {
             //if enlisted in a dtx, copy the relevant slice from

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Jun  9 
13:53:57 2008
@@ -34,7 +34,7 @@
 #include "TxBuffer.h"
 
 #include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/AggregateOutput.h"
 #include "qpid/shared_ptr.h"
@@ -115,7 +115,7 @@
     DtxBuffer::shared_ptr dtxBuffer;
     bool dtxSelected;
     DtxBufferMap suspendedXids;
-    framing::AccumulatedAck accumulatedAck;
+    framing::SequenceSet accumulatedAck;
     bool flowActive;
     boost::shared_ptr<Exchange> cacheExchange;
     sys::AggregateOutput outputTasks;
@@ -125,7 +125,6 @@
     bool checkPrefetch(boost::intrusive_ptr<Message>& msg);
     void checkDtxTimeout();
     ConsumerImpl& find(const std::string& destination);
-    void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
     void complete(DeliveryRecord&);
     AckRange findRange(DeliveryId first, DeliveryId last);
     void requestDispatch();
@@ -168,7 +167,7 @@
 
     bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool 
ackExpected);
     void startTx();
-    void commit(MessageStore* const store, bool completeOnCommit);
+    void commit(MessageStore* const store);
     void rollback();
     void selectDtx();
     void startDtx(const std::string& xid, DtxManager& mgr, bool join);
@@ -184,10 +183,6 @@
     void handle(boost::intrusive_ptr<Message> msg);
     bool doOutput() { return outputTasks.doOutput(); }
 
-    //preview only (completed == ack):
-    void ackCumulative(DeliveryId deliveryTag);
-    void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
-
     //final 0-10 spec (completed and accepted are distinct):
     void completed(DeliveryId deliveryTag, DeliveryId endTag);
     void accepted(DeliveryId deliveryTag, DeliveryId endTag);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Jun  9 
13:53:57 2008
@@ -464,7 +464,7 @@
 
 void SessionAdapter::TxHandlerImpl::commit()
 {
-    state.commit(&getBroker().getStore(), false);
+    state.commit(&getBroker().getStore());
 }
 
 void SessionAdapter::TxHandlerImpl::rollback()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Mon Jun  9 
13:53:57 2008
@@ -25,9 +25,9 @@
 using std::bind2nd;
 using std::mem_fun_ref;
 using namespace qpid::broker;
-using qpid::framing::AccumulatedAck;
+using qpid::framing::SequenceSet;
 
-TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& 
_unacked) : 
+TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : 
     acked(_acked), unacked(_unacked) {}
 
 bool TxAccept::prepare(TransactionContext* ctxt) throw()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Mon Jun  9 
13:53:57 2008
@@ -24,7 +24,7 @@
 #include <algorithm>
 #include <functional>
 #include <list>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
 #include "DeliveryRecord.h"
 #include "TxOp.h"
 
@@ -35,7 +35,7 @@
          * a transactional channel.
          */
         class TxAccept : public TxOp{
-            framing::AccumulatedAck& acked;
+            framing::SequenceSet& acked;
             std::list<DeliveryRecord>& unacked;
 
         public:
@@ -44,7 +44,7 @@
              * acks received
              * @param unacked the record of delivered messages
              */
-            TxAccept(framing::AccumulatedAck& acked, 
std::list<DeliveryRecord>& unacked);
+            TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& 
unacked);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Mon 
Jun  9 13:53:57 2008
@@ -32,7 +32,6 @@
 ConnectionSettings::ConnectionSettings() :
     host("localhost"), 
     port(TcpAddress::DEFAULT_PORT),
-    clientid("cpp"), 
     username("guest"), 
     password("guest"),
     mechanism("PLAIN"),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h Mon Jun  
9 13:53:57 2008
@@ -60,7 +60,6 @@
      */
     std::string virtualhost;
 
-    std::string clientid;
     /**
      * The username to use when authenticating the connection.
      */

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h Mon Jun  9 
13:53:57 2008
@@ -38,7 +38,6 @@
             ("broker,b", optValue(host, "HOST"), "Broker host to connect to") 
             ("port,p", optValue(port, "PORT"), "Broker port to connect to")
             ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
-            ("clientname,n", optValue(clientid, "ID"), "unique client 
identifier")
             ("username", optValue(username, "USER"), "user name for broker log 
in.")
             ("password", optValue(password, "PASSWORD"), "password for broker 
log in.")
             ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use 
when authenticating.")

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon Jun  9 13:53:57 2008
@@ -59,7 +59,6 @@
        SequenceNumberTest.cpp \
        TimerTest.cpp \
        TopicExchangeTest.cpp \
-       TxAckTest.cpp \
        TxBufferTest.cpp \
        TxPublishTest.cpp \
        MessageBuilderTest.cpp \

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=665890&r1=665889&r2=665890&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Mon Jun  9 13:53:57 
2008
@@ -95,8 +95,9 @@
     size_t iterations;
     Mode mode;
     bool summary;
-       uint32_t intervalSub;
-       uint32_t intervalPub;
+    uint32_t intervalSub;
+    uint32_t intervalPub;
+    size_t tx;
 
     static const std::string helpText;
     
@@ -106,7 +107,7 @@
         pubs(1), count(500000), size(1024), confirm(true), durable(false), 
uniqueData(false), syncPub(false),
         subs(1), ack(0),
         qt(1), iterations(1), mode(SHARED), summary(false),
-               intervalSub(0), intervalPub(0)
+        intervalSub(0), intervalPub(0), tx(0)
     {
         addOptions()
             ("setup", optValue(setup), "Create shared queues.")
@@ -140,7 +141,9 @@
             ("queue-durable", optValue(queueDurable, "N"), "Make queue durable 
(implied if durable set)")
 
             ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between 
msg consume")
-            ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between 
msg publish");
+            ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between 
msg publish")
+
+            ("tx", optValue(tx, "N"), "if non-zero, the transaction batch 
size");
     }
 
     // Computed values
@@ -450,6 +453,7 @@
                 
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
 
 
+            if (opts.tx) sync(session).txSelect();
             SubscriptionManager subs(session);
             LocalQueue lq;
             subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); 
@@ -474,6 +478,7 @@
                             arg::content=msg,
                             arg::acceptMode=1);
                     }
+                    if (opts.tx && (j % opts.tx == 0)) 
sync(session).txCommit();
                     if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
                 }
                 if (opts.confirm) session.sync();
@@ -483,6 +488,7 @@
                 // Send result to controller.
                 Message report(lexical_cast<string>(opts.count/time), 
"pub_done");
                 session.messageTransfer(arg::content=report, 
arg::acceptMode=1);
+                if (opts.tx) sync(session).txCommit();
             }
             session.close();
         }
@@ -523,16 +529,17 @@
     }
 
     void run() {                // Subscribe
-        try {
+        try {            
+            if (opts.tx) sync(session).txSelect();
             SubscriptionManager subs(session);
-            LocalQueue lq(AckPolicy(opts.ack));
-            subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
+            LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack));
+            subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1);
             subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
                                 false);
             subs.subscribe(lq, queue);
             // Notify controller we are ready.
             session.messageTransfer(arg::content=Message("ready", 
"sub_ready"), arg::acceptMode=1);
-
+            if (opts.tx) sync(session).txCommit();
             
             for (size_t j = 0; j < opts.iterations; ++j) {
                 if (j > 0) {
@@ -544,6 +551,7 @@
                 size_t expect=0;
                 for (size_t i = 0; i < opts.subQuota; ++i) {
                     msg=lq.pop();
+                    if (opts.tx && (i % opts.tx == 0)) 
sync(session).txCommit();
                     if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
                     // TODO aconway 2007-11-23: check message order for. 
                     // multiple publishers. Need an array of counters,
@@ -560,14 +568,17 @@
                         expect = n+1;
                     }
                 }
-                if (opts.ack !=0)
+                if (opts.ack)
                     subs.getAckPolicy().ackOutstanding(session); // Cumulative 
ack for final batch.
+                if (opts.tx)
+                    sync(session).txCommit();
                 AbsTime end=now();
 
                 // Report to publisher.
                 Message 
result(lexical_cast<string>(opts.subQuota/secs(start,end)),
                                "sub_done");
                 session.messageTransfer(arg::content=result, 
arg::acceptMode=1);
+                if (opts.tx) sync(session).txCommit();
             }
             session.close();
         }


Reply via email to