Author: gsim
Date: Mon Apr 21 07:37:03 2008
New Revision: 650159
URL: http://svn.apache.org/viewvc?rev=650159&view=rev
Log:
QPID-920: send message-accept for acks (as well as completion)
* AckPolicy now maintains a set of transfered messages for cumulative accepts
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h Mon Apr 21
07:37:03 2008
@@ -21,6 +21,8 @@
*
*/
+#include "qpid/framing/SequenceSet.h"
+
namespace qpid {
namespace client {
@@ -31,21 +33,36 @@
*/
class AckPolicy
{
+ framing::SequenceSet accepted;
size_t interval;
size_t count;
public:
/**
[EMAIL PROTECTED] n: acknowledge every n messages.
- *n==0 means no automatick acknowledgement.
+ *n==0 means no automatic acknowledgement.
*/
AckPolicy(size_t n=1) : interval(n), count(n) {}
- void ack(const Message& msg) {
+ void ack(const Message& msg, Session& session) {
+ accepted.add(msg.getId());
if (!interval) return;
- bool send=(--count==0);
- msg.acknowledge(true, send);
- if (send) count = interval;
+ if (--count==0) {
+ session.markCompleted(msg.getId(), false, true);
+ session.messageAccept(accepted);
+ accepted.clear();
+ count = interval;
+ } else {
+ session.markCompleted(msg.getId(), false, false);
+ }
+ }
+
+ void ackOutstanding(Session& session) {
+ if (!accepted.empty()) {
+ session.messageAccept(accepted);
+ accepted.clear();
+ session.sendCompletion();
+ }
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Mon Apr 21
07:37:03 2008
@@ -23,6 +23,7 @@
#include <sstream>
#include "Channel.h"
#include "qpid/sys/Monitor.h"
+#include "AckPolicy.h"
#include "Message.h"
#include "Connection.h"
#include "Demux.h"
@@ -202,8 +203,10 @@
if (incoming->tryPop(p)) {
msg.populate(*p);
if (ackMode == AUTO_ACK) {
- msg.setSession(session);
- msg.acknowledge(false, true);
+ AckPolicy acker;
+ acker.ack(msg, session);
+ } else {
+ session.markCompleted(msg.getId(), false, false);
}
return true;
}
@@ -260,7 +263,7 @@
bool send = i->second.ackMode == AUTO_ACK
|| (prefetch && ++(i->second.count) > (prefetch / 2));
if (send) i->second.count = 0;
- session.getExecution().markCompleted(content.getId(), true, send);
+ session.markCompleted(content.getId(), true, send);
}
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " <<
destination);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Mon Apr 21
07:37:03 2008
@@ -43,7 +43,7 @@
{
if (listener) {
listener->received(msg);
- autoAck.ack(msg);
+ autoAck.ack(msg, session);
}
}
@@ -72,7 +72,7 @@
Mutex::ScopedUnlock u(lock);
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message msg(*content, session);
+ Message msg(*content);
Subscriber::shared_ptr listener = find(msg.getDestination());
if (!listener) {
QPID_LOG(error, "No listener found for destination " <<
msg.getDestination());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Mon Apr 21
07:37:03 2008
@@ -36,10 +36,6 @@
public:
virtual ~Execution() {}
/**
- * Mark the incoming command with the specified id as completed
- */
- virtual void markCompleted(const framing::SequenceNumber& id, bool
cumulative, bool notifyPeer) = 0;
- /**
* Provides access to the demultiplexing function within the
* session implementation
*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Mon Apr 21
07:37:03 2008
@@ -36,8 +36,8 @@
throw ClosedException();
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message m(*content, session);
- autoAck.ack(m);
+ Message m(*content);
+ autoAck.ack(m, session);
return m;
}
else
@@ -46,6 +46,7 @@
}
void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
+AckPolicy& LocalQueue::getAckPolicy() { return autoAck; }
bool LocalQueue::empty() const
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Mon Apr 21
07:37:03 2008
@@ -47,9 +47,10 @@
bool empty() const;
size_t size() const;
void setAckPolicy(AckPolicy);
+ AckPolicy& getAckPolicy();
private:
- friend class SubscriptionManager;
+ friend class SubscriptionManager;
Session session;
Demux::QueuePtr queue;
AckPolicy autoAck;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp Mon Apr 21
07:37:03 2008
@@ -48,11 +48,6 @@
return getMessageProperties().getApplicationHeaders();
}
- void Message::acknowledge(bool cumulative, bool notifyPeer) const
- {
- const_cast<Session&>(session).getExecution().markCompleted(id,
cumulative, notifyPeer);
- }
-
const framing::MessageTransferBody& Message::getMethod() const
{
return method;
@@ -64,13 +59,10 @@
}
/[EMAIL PROTECTED] for incoming messages */
- Message::Message(const framing::FrameSet& frameset, Session s) :
- method(*frameset.as<framing::MessageTransferBody>()),
id(frameset.getId()), session(s)
+ Message::Message(const framing::FrameSet& frameset) :
+ method(*frameset.as<framing::MessageTransferBody>()),
id(frameset.getId())
{
populate(frameset);
}
-
- /[EMAIL PROTECTED] use for incoming messages. */
- void Message::setSession(Session s) { session=s; }
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Mon Apr 21 07:37:03
2008
@@ -45,19 +45,15 @@
bool isRedelivered() const;
void setRedelivered(bool redelivered);
framing::FieldTable& getHeaders();
- void acknowledge(bool cumulative = true, bool notifyPeer = true) const;
const framing::MessageTransferBody& getMethod() const;
const framing::SequenceNumber& getId() const;
/[EMAIL PROTECTED] for incoming messages */
- Message(const framing::FrameSet& frameset, Session s);
- /[EMAIL PROTECTED] use for incoming messages. */
- void setSession(Session s);
+ Message(const framing::FrameSet& frameset);
private:
//method and id are only set for received messages:
framing::MessageTransferBody method;
framing::SequenceNumber id;
- Session session;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp Mon Apr 21
07:37:03 2008
@@ -50,6 +50,16 @@
impl->send(b).wait(*impl);
}
+void SessionBase::markCompleted(const framing::SequenceNumber& id, bool
cumulative, bool notifyPeer)
+{
+ impl->markCompleted(id, cumulative, notifyPeer);
+}
+
+void SessionBase::sendCompletion()
+{
+ impl->sendCompletion();
+}
+
Uuid SessionBase::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h Mon Apr 21
07:37:03 2008
@@ -131,6 +131,8 @@
Execution& getExecution();
void sync();
+ void markCompleted(const framing::SequenceNumber& id, bool cumulative,
bool notifyPeer);
+ void sendCompletion();
typedef framing::TransferContent DefaultContent;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Apr 21
07:37:03 2008
@@ -518,6 +518,12 @@
void SessionImpl::sendCompletion()
{
+ Lock l(state);
+ sendCompletionImpl();
+}
+
+void SessionImpl::sendCompletionImpl()
+{
proxy.completed(completedIn, true);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Apr 21
07:37:03 2008
@@ -87,6 +87,7 @@
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
void waitForCompletion(const framing::SequenceNumber& id);
+ void sendCompletion();
//NOTE: these are called by the network thread when the connection is
closed or dies
void connectionClosed(uint16_t code, const std::string& text);
@@ -122,7 +123,7 @@
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
- void sendCompletion();
+ void sendCompletionImpl();
// Note: Following methods are called by network thread in
// response to session controls from the broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Mon
Apr 21 07:37:03 2008
@@ -87,6 +87,8 @@
void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
+AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; }
+
void SubscriptionManager::cancel(const std::string dest)
{
dispatcher.cancel(dest);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Mon Apr
21 07:37:03 2008
@@ -133,6 +133,10 @@
* Default is to acknowledge every message automatically.
*/
void setAckPolicy(const AckPolicy& autoAck);
+ /**
+ *
+ */
+ AckPolicy& getAckPolicy();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp Mon Apr 21
07:37:03 2008
@@ -143,20 +143,24 @@
void SequenceSet::remove(const SequenceNumber& s)
{
- for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >=
i->start; i++) {
- if (i->start == s) {
+ for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >=
i->start; i++) {
+ if (i->contains(s)) {
if (i->start == i->end) {
- ranges.erase(i);
- } else {
+ //range is just a single number, so we can delete the whole
range
+ i = ranges.erase(i);
+ } else if (i->start == s) {
+ //move the start forward to exclude s
++(i->start);
+ } else if (i->end == s) {
+ //move the end backward to exclude s
+ --(i->end);
+ } else {
+ //need to split range pointed to by i
+ Range r(i->start, (uint32_t)s - 1);
+ i->start = s + 1;
+ i = ranges.insert(i, r);
}
- } else if (i->end == s) {
- --(i->end);
- } else if (i->contains(s)) {
- //need to split range pointed to by i
- Range r(i->start, (uint32_t)s - 1);
- i->start = s + 1;
- ranges.insert(i, r);
+ break;
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Apr 21
07:37:03 2008
@@ -20,6 +20,7 @@
*/
#include "unit_test.h"
#include "BrokerFixture.h"
+#include "qpid/client/AckPolicy.h"
#include "qpid/client/Dispatcher.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
@@ -124,7 +125,8 @@
BOOST_CHECK(msg->isA<MessageTransferBody>());
BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
//confirm receipt:
- fix.session.getExecution().markCompleted(msg->getId(), true, true);
+ AckPolicy autoAck;
+ autoAck.ack(Message(*msg), fix.session);
}
QPID_AUTO_TEST_CASE(testDispatcher)
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp Mon Apr 21 07:37:03
2008
@@ -124,10 +124,11 @@
if (opts.trace) std::cout << "Subscribed to queue." << std::endl;
FrameSet::shared_ptr incoming = session.get();
if (incoming->isA<MessageTransferBody>()) {
- Message msgIn(*incoming, session);
+ Message msgIn(*incoming);
if (msgIn.getData() == msgOut.getData()) {
if (opts.trace) std::cout << "Received the exepected message."
<< std::endl;
- msgIn.acknowledge();
+ session.messageAccept(SequenceSet(msgIn.getId()));
+ session.markCompleted(msgIn.getId(), true, true);
} else {
print("Received an unexepected message: ", msgIn);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Mon Apr 21 07:37:03
2008
@@ -533,9 +533,9 @@
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
- if (opts.intervalSub)
::usleep(opts.intervalSub*1000);
+ if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
- // multiple publishers. Need an acorray of counters,
+ // multiple publishers. Need an array of counters,
// one per publisher and a publisher ID in the
// message. Careful not to introduce a lot of overhead
// here, e.g. no std::map, std::string etc.
@@ -550,7 +550,7 @@
}
}
if (opts.ack !=0)
- msg.acknowledge(); // Cumulative ack for final batch.
+ subs.getAckPolicy().ackOutstanding(session); // Cumulative
ack for final batch.
AbsTime end=now();
// Report to publisher.
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Mon Apr 21
07:37:03 2008
@@ -159,7 +159,7 @@
if(!!type && StringValue("TERMINATION_REQUEST") == *type){
shutdown();
}else if(!!type && StringValue("REPORT_REQUEST") == *type){
- message.acknowledge();//acknowledge everything upto this point
+ mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything
upto this point
cout <<"Batch ended, sending report." << endl;
//send a report:
report();
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp Mon Apr 21 07:37:03 2008
@@ -144,7 +144,7 @@
out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
session.messageTransfer(arg::content=out,
arg::acceptMode=1);
}
- in.acknowledge();
+ lq.getAckPolicy().ackOutstanding(session);
session.txCommit();
}
} catch(const std::exception& e) {