Author: gsim
Date: Wed Oct 29 08:15:08 2008
New Revision: 708919
URL: http://svn.apache.org/viewvc?rev=708919&view=rev
Log:
* added flag to SubscriptionSettings to control automatic completion of message
* removed automatic acquiring under autoAck mode
* added test for results from acquire requests
* added short txtest to the set of system tests run under make check
Added:
incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h Wed Oct 29
08:15:08 2008
@@ -42,8 +42,9 @@
* is renewed.
*
* In "window mode" credit is automatically renewed when a message is
- * accepted. In non-window mode credit is not automatically renewed,
- * it must be explicitly re-set (@see Subscription)
+ * completed (which by default happens when it is accepted). In
+ * non-window mode credit is not automatically renewed, it must be
+ * explicitly re-set (@see Subscription)
*/
struct FlowControl {
static const uint32_t UNLIMITED=0xFFFFFFFF;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Wed Oct
29 08:15:08 2008
@@ -27,6 +27,7 @@
namespace client {
using sys::Mutex;
+using framing::MessageAcquireResult;
SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string&
q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
: manager(m), name(n), queue(q), settings(s), listener(l)
@@ -68,16 +69,19 @@
void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
- manager.getSession().messageAcquire(messageIds);
- unacquired.remove(messageIds);
+ MessageAcquireResult result =
manager.getSession().messageAcquire(messageIds);
+ unacquired.remove(result.getTransfers());
if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
- unaccepted.add(messageIds);
+ unaccepted.add(result.getTransfers());
}
void SubscriptionImpl::accept(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
manager.getSession().messageAccept(messageIds);
unaccepted.remove(messageIds);
+ if (settings.autoComplete) {
+ manager.getSession().sendCompletion();
+ }
}
Session SubscriptionImpl::getSession() const { return manager.getSession(); }
@@ -88,7 +92,6 @@
void SubscriptionImpl::received(Message& m) {
Mutex::ScopedLock l(lock);
- manager.getSession().markCompleted(m.getId(), false, false);
if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
unacquired.add(m.getId());
else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
@@ -99,15 +102,16 @@
listener->received(m);
}
+ if (settings.autoComplete) {
+ manager.getSession().markCompleted(m.getId(), false, false);
+ }
if (settings.autoAck) {
- if (unacquired.size() + unaccepted.size() >= settings.autoAck) {
- if (unacquired.size()) {
- async(manager.getSession()).messageAcquire(unacquired);
- unaccepted.add(unacquired);
- unaccepted.clear();
- }
+ if (unaccepted.size() >= settings.autoAck) {
async(manager.getSession()).messageAccept(unaccepted);
unaccepted.clear();
+ if (settings.autoComplete) {
+ manager.getSession().sendCompletion();
+ }
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h Wed
Oct 29 08:15:08 2008
@@ -39,22 +39,33 @@
FlowControl flow=FlowControl::unlimited(),
AcceptMode accept=ACCEPT_MODE_EXPLICIT,
AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
- unsigned int autoAck_=1
- ) : flowControl(flow), acceptMode(accept), acquireMode(acquire),
autoAck(autoAck_) {}
+ unsigned int autoAck_=1,
+ bool autoComplete_=true
+ ) : flowControl(flow), acceptMode(accept), acquireMode(acquire),
autoAck(autoAck_), autoComplete(autoComplete_) {}
FlowControl flowControl; ///@< Flow control settings. @see FlowControl
AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
AcquireMode acquireMode; ///@< ACQUIRE_MODE_PRE_ACQUIRED or
ACQUIRE_MODE_NOT_ACQUIRED
- /** Automatically acknowledge (acquire and accept) batches of autoAck
messages.
- * 0 means no automatic acknowledgement. What it means to "acknowledge" a
message depends on
- * acceptMode and acquireMode:
- * - ACCEPT_MODE_NONE and ACQUIRE_MODE_PRE_ACQUIRED: do nothing
- * - ACCEPT_MODE_NONE and ACQUIRE_MODE_NOT_ACQUIRED: send an "acquire"
command
- * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED: send "accept"
command
- * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_NOT_ACQUIRED: send "acquire"
and "accept" commands
- */
+ /** Automatically acknowledge (accept) batches of autoAck
+ * messages. 0 means no automatic acknowledgement. This has no
+ * effect for messsages received for a subscription with
+ * ACCEPT_MODE_NODE.*/
unsigned int autoAck;
+ /**
+ * If set to true, messages will be marked as completed (in
+ * windowing mode, completion of a message will cause the credit
+ * used up by that message to be reallocated) once they have been
+ * received. The server will be explicitly notified of all
+ * completed messages when the next accept is sent through the
+ * subscription (either explictly or through autAck). However the
+ * server may also periodically request information on the
+ * completed messages.
+ *
+ * If set to false the application is responsible for completing
+ * messages (@see Session::markCompleted()).
+ */
+ bool autoComplete;
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Oct 29 08:15:08 2008
@@ -134,7 +134,7 @@
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR=
BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
-system_tests = client_test quick_perftest quick_topictest run_header_test
+system_tests = client_test quick_perftest quick_topictest run_header_test
quick_txtest
TESTS += start_broker $(system_tests) python_tests stop_broker
run_federation_tests run_acl_tests
EXTRA_DIST += \
@@ -142,6 +142,7 @@
run-unit-tests start_broker python_tests stop_broker
\
quick_topictest \
quick_perftest \
+ quick_txtest \
topictest \
run_header_test \
header_test.py \
Added: incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest?rev=708919&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest Wed Oct 29 08:15:08
2008
@@ -0,0 +1,2 @@
+#!/bin/sh
+exec `dirname $0`/run_test ./txtest --queues 4 --tx-count 10 --quiet
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/quick_txtest
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp?rev=708919&r1=708918&r2=708919&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp Wed Oct 29 08:15:08 2008
@@ -51,11 +51,12 @@
uint txCount;
uint totalMsgCount;
bool dtx;
+ bool quiet;
Args() : init(true), transfer(true), check(true),
size(256), durable(true), queues(2),
base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10),
- dtx(false)
+ dtx(false), quiet(false)
{
addOptions()
@@ -69,7 +70,8 @@
("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages
transferred per transaction")
("tx-count", optValue(txCount, "N"), "number of transactions per
'agent'")
("total-messages", optValue(totalMsgCount, "N"), "total number of
messages in 'circulation'")
- ("dtx", optValue(dtx, "yes|no"), "use distributed transactions");
+ ("dtx", optValue(dtx, "yes|no"), "use distributed transactions")
+ ("quiet", optValue(quiet), "reduce output from test");
}
};
@@ -159,7 +161,6 @@
session.messageTransfer(arg::content=out,
arg::acceptMode=1);
}
sub.accept(sub.getUnaccepted());
- session.sendCompletion();
if (opts.dtx) {
session.dtxEnd(arg::xid=xid);
session.dtxPrepare(arg::xid=xid);
@@ -219,7 +220,7 @@
StringSet::iterator next = i + 1;
if (next == queues.end()) next = queues.begin();
- std::cout << "Transfering from " << *i << " to " << *next <<
std::endl;
+ if (!opts.quiet) std::cout << "Transfering from " << *i << " to "
<< *next << std::endl;
agents.push_back(new Transfer(*i, *next));
agents.back().thread = Thread(agents.back());
}
@@ -241,13 +242,13 @@
xidArr.collect(inDoubtXids);
if (inDoubtXids.size()) {
- std::cout << "Recovering DTX in-doubt transaction(s):" <<
std::endl;
+ if (!opts.quiet) std::cout << "Recovering DTX in-doubt
transaction(s):" << std::endl;
framing::StructHelper decoder;
framing::Xid xid;
// abort even, commit odd transactions
for (unsigned i = 0; i < inDoubtXids.size(); i++) {
decoder.decode(xid, inDoubtXids[i]);
- std::cout << (i%2 ? " * aborting " : " * committing ");
+ if (!opts.quiet) std::cout << (i%2 ? " * aborting " : " *
committing ");
xid.print(std::cout);
std::cout << std::endl;
if (i%2) {
@@ -276,7 +277,7 @@
drained.push_back(m.getMessageProperties().getCorrelationId());
++count;
}
- std::cout << "Drained " << count << " messages from " << *i <<
std::endl;
+ if (!opts.quiet) std::cout << "Drained " << count << " messages
from " << *i << std::endl;
}
sort(ids.begin(), ids.end());