Author: gsim
Date: Wed Oct 29 08:15:08 2008
New Revision: 708919

URL: http://svn.apache.org/viewvc?rev=708919&view=rev
Log:
* added flag to SubscriptionSettings to control automatic completion of message
* removed automatic acquiring under autoAck mode
* added test for results from acquire requests
* added short txtest to the set of system tests run under make check


Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h Wed Oct 29 
08:15:08 2008
@@ -42,8 +42,9 @@
  * is renewed.
  *
  * In "window mode" credit is automatically renewed when a message is
- * accepted. In non-window mode credit is not automatically renewed,
- * it must be explicitly re-set (@see Subscription)
+ * completed (which by default happens when it is accepted). In
+ * non-window mode credit is not automatically renewed, it must be
+ * explicitly re-set (@see Subscription)
  */
 struct FlowControl {
     static const uint32_t UNLIMITED=0xFFFFFFFF;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Wed Oct 
29 08:15:08 2008
@@ -27,6 +27,7 @@
 namespace client {
 
 using sys::Mutex;
+using framing::MessageAcquireResult;
 
 SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& 
q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
     : manager(m), name(n), queue(q), settings(s), listener(l)
@@ -68,16 +69,19 @@
 
 void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
     Mutex::ScopedLock l(lock);
-    manager.getSession().messageAcquire(messageIds);
-    unacquired.remove(messageIds);
+    MessageAcquireResult result = 
manager.getSession().messageAcquire(messageIds);
+    unacquired.remove(result.getTransfers());
     if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
-        unaccepted.add(messageIds);
+        unaccepted.add(result.getTransfers());
 }
 
 void SubscriptionImpl::accept(const SequenceSet& messageIds) {
     Mutex::ScopedLock l(lock);
     manager.getSession().messageAccept(messageIds);
     unaccepted.remove(messageIds);
+    if (settings.autoComplete) {
+        manager.getSession().sendCompletion();
+    }
 }
 
 Session SubscriptionImpl::getSession() const { return manager.getSession(); }
@@ -88,7 +92,6 @@
 
 void SubscriptionImpl::received(Message& m) {
     Mutex::ScopedLock l(lock);
-    manager.getSession().markCompleted(m.getId(), false, false);        
     if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) 
         unacquired.add(m.getId());
     else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
@@ -99,15 +102,16 @@
         listener->received(m);
     }
 
+    if (settings.autoComplete) {
+        manager.getSession().markCompleted(m.getId(), false, false);
+    }
     if (settings.autoAck) {
-        if (unacquired.size() + unaccepted.size() >= settings.autoAck) {
-            if (unacquired.size()) {
-                async(manager.getSession()).messageAcquire(unacquired);
-                unaccepted.add(unacquired);
-                unaccepted.clear();
-            }
+        if (unaccepted.size() >= settings.autoAck) {
             async(manager.getSession()).messageAccept(unaccepted);
             unaccepted.clear();
+            if (settings.autoComplete) {
+                manager.getSession().sendCompletion();
+            }
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h Wed 
Oct 29 08:15:08 2008
@@ -39,22 +39,33 @@
         FlowControl flow=FlowControl::unlimited(),
         AcceptMode accept=ACCEPT_MODE_EXPLICIT,
         AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
-        unsigned int autoAck_=1
-    ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), 
autoAck(autoAck_) {}
+        unsigned int autoAck_=1,
+        bool autoComplete_=true
+    ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), 
autoAck(autoAck_), autoComplete(autoComplete_) {}
                          
     FlowControl flowControl;    ///@< Flow control settings. @see FlowControl
     AcceptMode acceptMode;      ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
     AcquireMode acquireMode;    ///@< ACQUIRE_MODE_PRE_ACQUIRED or 
ACQUIRE_MODE_NOT_ACQUIRED
 
-    /** Automatically acknowledge (acquire and accept) batches of autoAck 
messages.
-     * 0 means no automatic acknowledgement. What it means to "acknowledge" a 
message depends on
-     * acceptMode and acquireMode:
-     *  - ACCEPT_MODE_NONE and ACQUIRE_MODE_PRE_ACQUIRED: do nothing
-     *  - ACCEPT_MODE_NONE and ACQUIRE_MODE_NOT_ACQUIRED: send an "acquire" 
command
-     *  - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED: send "accept" 
command
-     *  - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_NOT_ACQUIRED: send "acquire" 
and "accept" commands
-     */
+    /** Automatically acknowledge (accept) batches of autoAck
+     *  messages. 0 means no automatic acknowledgement. This has no
+     *  effect for messsages received for a subscription with
+     *  ACCEPT_MODE_NODE.*/
     unsigned int autoAck;
+    /**
+     * If set to true, messages will be marked as completed (in
+     * windowing mode, completion of a message will cause the credit
+     * used up by that message to be reallocated) once they have been
+     * received. The server will be explicitly notified of all
+     * completed messages when the next accept is sent through the
+     * subscription (either explictly or through autAck). However the
+     * server may also periodically request information on the
+     * completed messages.
+     * 
+     * If set to false the application is responsible for completing
+     * messages (@see Session::markCompleted()).
+     */
+    bool autoComplete;
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Oct 29 08:15:08 2008
@@ -134,7 +134,7 @@
 
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= 
BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test 
 
-system_tests = client_test quick_perftest quick_topictest run_header_test
+system_tests = client_test quick_perftest quick_topictest run_header_test 
quick_txtest
 TESTS += start_broker $(system_tests) python_tests stop_broker 
run_federation_tests run_acl_tests
 
 EXTRA_DIST +=                                                          \
@@ -142,6 +142,7 @@
   run-unit-tests start_broker python_tests stop_broker                         
\
   quick_topictest                                                      \
   quick_perftest                                                       \
+  quick_txtest                                                         \
   topictest                                                            \
   run_header_test                                                      \
   header_test.py                                                       \

Added: incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest?rev=708919&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest Wed Oct 29 08:15:08 
2008
@@ -0,0 +1,2 @@
+#!/bin/sh
+exec `dirname $0`/run_test ./txtest --queues 4 --tx-count 10 --quiet

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp Wed Oct 29 08:15:08 2008
@@ -51,11 +51,12 @@
     uint txCount;
     uint totalMsgCount;
     bool dtx;
+    bool quiet;
 
     Args() : init(true), transfer(true), check(true), 
              size(256), durable(true), queues(2), 
              base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10),
-             dtx(false)
+             dtx(false), quiet(false)
     {
         addOptions()            
 
@@ -69,7 +70,8 @@
             ("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages 
transferred per transaction")
             ("tx-count", optValue(txCount, "N"), "number of transactions per 
'agent'")
             ("total-messages", optValue(totalMsgCount, "N"), "total number of 
messages in 'circulation'")
-            ("dtx", optValue(dtx, "yes|no"), "use distributed transactions");
+            ("dtx", optValue(dtx, "yes|no"), "use distributed transactions")
+            ("quiet", optValue(quiet), "reduce output from test");
     }
 };
 
@@ -159,7 +161,6 @@
                     session.messageTransfer(arg::content=out, 
arg::acceptMode=1);
                 }
                 sub.accept(sub.getUnaccepted());
-                session.sendCompletion();
                 if (opts.dtx) {
                     session.dtxEnd(arg::xid=xid);
                     session.dtxPrepare(arg::xid=xid);
@@ -219,7 +220,7 @@
             StringSet::iterator next = i + 1;
             if (next == queues.end()) next = queues.begin();
 
-            std::cout << "Transfering from " << *i << " to " << *next << 
std::endl;
+            if (!opts.quiet) std::cout << "Transfering from " << *i << " to " 
<< *next << std::endl;
             agents.push_back(new Transfer(*i, *next));
             agents.back().thread = Thread(agents.back());
         }
@@ -241,13 +242,13 @@
             xidArr.collect(inDoubtXids);
 
             if (inDoubtXids.size()) {
-                std::cout << "Recovering DTX in-doubt transaction(s):" << 
std::endl;
+                if (!opts.quiet) std::cout << "Recovering DTX in-doubt 
transaction(s):" << std::endl;
                 framing::StructHelper decoder;
                 framing::Xid xid;
                 // abort even, commit odd transactions
                 for (unsigned i = 0; i < inDoubtXids.size(); i++) {
                     decoder.decode(xid, inDoubtXids[i]);
-                    std::cout << (i%2 ? " * aborting " : " * committing ");
+                    if (!opts.quiet) std::cout << (i%2 ? " * aborting " : " * 
committing ");
                     xid.print(std::cout);
                     std::cout << std::endl;
                     if (i%2) {
@@ -276,7 +277,7 @@
                 drained.push_back(m.getMessageProperties().getCorrelationId());
                 ++count;
             }
-            std::cout << "Drained " << count << " messages from " << *i << 
std::endl;
+            if (!opts.quiet) std::cout << "Drained " << count << " messages 
from " << *i << std::endl;
         }
 
         sort(ids.begin(), ids.end());


Reply via email to