Author: gsim
Date: Mon Apr 21 07:37:03 2008
New Revision: 650159

URL: http://svn.apache.org/viewvc?rev=650159&view=rev
Log:
QPID-920: send message-accept for acks (as well as completion)
* AckPolicy now maintains a set of transfered messages for cumulative accepts


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h Mon Apr 21 
07:37:03 2008
@@ -21,6 +21,8 @@
  *
  */
 
+#include "qpid/framing/SequenceSet.h"
+
 namespace qpid {
 namespace client {
 
@@ -31,21 +33,36 @@
  */
 class AckPolicy
 {
+    framing::SequenceSet accepted;
     size_t interval;
     size_t count;
 
   public:
     /**
      [EMAIL PROTECTED] n: acknowledge every n messages.
-     *n==0 means no automatick acknowledgement.
+     *n==0 means no automatic acknowledgement.
      */
     AckPolicy(size_t n=1) : interval(n), count(n) {}
 
-    void ack(const Message& msg) {
+    void ack(const Message& msg, Session& session) {
+        accepted.add(msg.getId());
         if (!interval) return;
-        bool send=(--count==0);
-        msg.acknowledge(true, send);
-        if (send) count = interval;
+        if (--count==0) {
+            session.markCompleted(msg.getId(), false, true);        
+            session.messageAccept(accepted);
+            accepted.clear();
+            count = interval;
+        } else {
+            session.markCompleted(msg.getId(), false, false);        
+        }
+    }
+
+    void ackOutstanding(Session& session) {
+        if (!accepted.empty()) {
+            session.messageAccept(accepted);
+            accepted.clear();
+            session.sendCompletion();
+        }
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Mon Apr 21 
07:37:03 2008
@@ -23,6 +23,7 @@
 #include <sstream>
 #include "Channel.h"
 #include "qpid/sys/Monitor.h"
+#include "AckPolicy.h"
 #include "Message.h"
 #include "Connection.h"
 #include "Demux.h"
@@ -202,8 +203,10 @@
     if (incoming->tryPop(p)) {
         msg.populate(*p);
         if (ackMode == AUTO_ACK) {
-            msg.setSession(session);
-            msg.acknowledge(false, true);
+            AckPolicy acker;
+            acker.ack(msg, session);
+        } else {
+            session.markCompleted(msg.getId(), false, false);
         }
         return true;
     }
@@ -260,7 +263,7 @@
             bool send = i->second.ackMode == AUTO_ACK
                 || (prefetch &&  ++(i->second.count) > (prefetch / 2));
             if (send) i->second.count = 0;
-            session.getExecution().markCompleted(content.getId(), true, send);
+            session.markCompleted(content.getId(), true, send);
         }
     } else {
         QPID_LOG(warning, "Dropping message for unrecognised consumer: " << 
destination);                        

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Mon Apr 21 
07:37:03 2008
@@ -43,7 +43,7 @@
 {
     if (listener) {
         listener->received(msg);
-        autoAck.ack(msg);
+        autoAck.ack(msg, session);
     }
 }
 
@@ -72,7 +72,7 @@
             Mutex::ScopedUnlock u(lock);
             FrameSet::shared_ptr content = queue->pop();
             if (content->isA<MessageTransferBody>()) {
-                Message msg(*content, session);
+                Message msg(*content);
                 Subscriber::shared_ptr listener = find(msg.getDestination());
                 if (!listener) {
                     QPID_LOG(error, "No listener found for destination " << 
msg.getDestination());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Mon Apr 21 
07:37:03 2008
@@ -36,10 +36,6 @@
 public:
     virtual ~Execution() {}
     /**
-     * Mark the incoming command with the specified id as completed
-     */
-    virtual void markCompleted(const framing::SequenceNumber& id, bool 
cumulative, bool notifyPeer) = 0;
-    /**
      * Provides access to the demultiplexing function within the
      * session implementation
      */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Mon Apr 21 
07:37:03 2008
@@ -36,8 +36,8 @@
         throw ClosedException();
     FrameSet::shared_ptr content = queue->pop();
     if (content->isA<MessageTransferBody>()) {
-        Message m(*content, session);
-        autoAck.ack(m);
+        Message m(*content);
+        autoAck.ack(m, session);
         return m;
     }
     else
@@ -46,6 +46,7 @@
 }
 
 void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
+AckPolicy& LocalQueue::getAckPolicy() { return autoAck; }
 
 bool LocalQueue::empty() const
 { 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Mon Apr 21 
07:37:03 2008
@@ -47,9 +47,10 @@
     bool empty() const;
     size_t size() const;
     void setAckPolicy(AckPolicy);
+    AckPolicy& getAckPolicy();
 
   private:
-  friend class SubscriptionManager;
+    friend class SubscriptionManager;
     Session session;
     Demux::QueuePtr queue;
     AckPolicy autoAck;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.cpp Mon Apr 21 
07:37:03 2008
@@ -48,11 +48,6 @@
         return getMessageProperties().getApplicationHeaders(); 
     }
 
-    void Message::acknowledge(bool cumulative, bool notifyPeer) const
-    {
-        const_cast<Session&>(session).getExecution().markCompleted(id, 
cumulative, notifyPeer);
-    }
-
     const framing::MessageTransferBody& Message::getMethod() const
     {
         return method;
@@ -64,13 +59,10 @@
     }
 
     /[EMAIL PROTECTED] for incoming messages */
-    Message::Message(const framing::FrameSet& frameset, Session s) :
-        method(*frameset.as<framing::MessageTransferBody>()), 
id(frameset.getId()), session(s)
+    Message::Message(const framing::FrameSet& frameset) :
+        method(*frameset.as<framing::MessageTransferBody>()), 
id(frameset.getId())
     {
         populate(frameset);
     }
-
-    /[EMAIL PROTECTED] use for incoming messages. */
-    void Message::setSession(Session s) { session=s; }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Mon Apr 21 07:37:03 
2008
@@ -45,19 +45,15 @@
     bool isRedelivered() const;
     void setRedelivered(bool redelivered);
     framing::FieldTable& getHeaders();
-    void acknowledge(bool cumulative = true, bool notifyPeer = true) const;
     const framing::MessageTransferBody& getMethod() const;
     const framing::SequenceNumber& getId() const;
 
     /[EMAIL PROTECTED] for incoming messages */
-    Message(const framing::FrameSet& frameset, Session s);
-    /[EMAIL PROTECTED] use for incoming messages. */
-    void setSession(Session s);
+    Message(const framing::FrameSet& frameset);
 private:
     //method and id are only set for received messages:
     framing::MessageTransferBody method;
     framing::SequenceNumber id;
-    Session session;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp Mon Apr 21 
07:37:03 2008
@@ -50,6 +50,16 @@
     impl->send(b).wait(*impl);
 }
 
+void SessionBase::markCompleted(const framing::SequenceNumber& id, bool 
cumulative, bool notifyPeer)
+{
+    impl->markCompleted(id, cumulative, notifyPeer);
+}
+
+void SessionBase::sendCompletion()
+{
+    impl->sendCompletion();
+}
+
 Uuid SessionBase::getId() const { return impl->getId(); }
 framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h Mon Apr 21 
07:37:03 2008
@@ -131,6 +131,8 @@
     
     Execution& getExecution();
     void sync();
+    void markCompleted(const framing::SequenceNumber& id, bool cumulative, 
bool notifyPeer);
+    void sendCompletion();
     
     typedef framing::TransferContent DefaultContent;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Apr 21 
07:37:03 2008
@@ -518,6 +518,12 @@
 
 void SessionImpl::sendCompletion()
 {
+    Lock l(state);
+    sendCompletionImpl();
+}
+
+void SessionImpl::sendCompletionImpl()
+{
     proxy.completed(completedIn, true);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Apr 21 
07:37:03 2008
@@ -87,6 +87,7 @@
     bool isComplete(const framing::SequenceNumber& id);
     bool isCompleteUpTo(const framing::SequenceNumber& id);
     void waitForCompletion(const framing::SequenceNumber& id);
+    void sendCompletion();
 
     //NOTE: these are called by the network thread when the connection is 
closed or dies
     void connectionClosed(uint16_t code, const std::string& text);
@@ -122,7 +123,7 @@
     void sendContent(const framing::MethodContent&);
     void waitForCompletionImpl(const framing::SequenceNumber& id);
 
-    void sendCompletion();
+    void sendCompletionImpl();
 
     // Note: Following methods are called by network thread in
     // response to session controls from the broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Mon 
Apr 21 07:37:03 2008
@@ -87,6 +87,8 @@
 
 void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
 
+AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; } 
+
 void SubscriptionManager::cancel(const std::string dest)
 {
     dispatcher.cancel(dest);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Mon Apr 
21 07:37:03 2008
@@ -133,6 +133,10 @@
      * Default is to acknowledge every message automatically.
      */
     void setAckPolicy(const AckPolicy& autoAck);
+    /**
+     *
+     */
+     AckPolicy& getAckPolicy();
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp Mon Apr 21 
07:37:03 2008
@@ -143,20 +143,24 @@
 
 void SequenceSet::remove(const SequenceNumber& s)
 {
-    for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= 
i->start; i++) {        
-        if (i->start == s) {
+    for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= 
i->start; i++) {
+        if (i->contains(s)) {
             if (i->start == i->end) {
-                ranges.erase(i);
-            } else {
+                //range is just a single number, so we can delete the whole 
range
+                i = ranges.erase(i);
+            } else if (i->start == s) {
+                //move the start forward to exclude s
                 ++(i->start);
+            } else if (i->end == s) {
+                //move the end backward to exclude s
+                --(i->end);
+            } else {
+                //need to split range pointed to by i
+                Range r(i->start, (uint32_t)s - 1);
+                i->start = s + 1;
+                i = ranges.insert(i, r);
             }
-        } else if (i->end == s) {
-            --(i->end);
-        } else if (i->contains(s)) {
-            //need to split range pointed to by i
-            Range r(i->start, (uint32_t)s - 1);
-            i->start = s + 1;
-            ranges.insert(i, r);
+            break;
         }
     }    
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Apr 21 
07:37:03 2008
@@ -20,6 +20,7 @@
  */
 #include "unit_test.h"
 #include "BrokerFixture.h"
+#include "qpid/client/AckPolicy.h"
 #include "qpid/client/Dispatcher.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Thread.h"
@@ -124,7 +125,8 @@
     BOOST_CHECK(msg->isA<MessageTransferBody>());
     BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
     //confirm receipt:
-    fix.session.getExecution().markCompleted(msg->getId(), true, true);
+    AckPolicy autoAck;
+    autoAck.ack(Message(*msg), fix.session);
 }
 
 QPID_AUTO_TEST_CASE(testDispatcher)

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp Mon Apr 21 07:37:03 
2008
@@ -124,10 +124,11 @@
        if (opts.trace) std::cout << "Subscribed to queue." << std::endl;
         FrameSet::shared_ptr incoming = session.get();
         if (incoming->isA<MessageTransferBody>()) {
-            Message msgIn(*incoming, session);
+            Message msgIn(*incoming);
             if (msgIn.getData() == msgOut.getData()) {
                 if (opts.trace) std::cout << "Received the exepected message." 
<< std::endl;
-                msgIn.acknowledge();
+                session.messageAccept(SequenceSet(msgIn.getId()));
+                session.markCompleted(msgIn.getId(), true, true);
             } else {
                 print("Received an unexepected message: ", msgIn);
             }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Mon Apr 21 07:37:03 
2008
@@ -533,9 +533,9 @@
                 size_t expect=0;
                 for (size_t i = 0; i < opts.subQuota; ++i) {
                     msg=lq.pop();
-                           if (opts.intervalSub) 
::usleep(opts.intervalSub*1000);
+                    if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
                     // TODO aconway 2007-11-23: check message order for. 
-                    // multiple publishers. Need an acorray of counters,
+                    // multiple publishers. Need an array of counters,
                     // one per publisher and a publisher ID in the
                     // message. Careful not to introduce a lot of overhead
                     // here, e.g. no std::map, std::string etc.
@@ -550,7 +550,7 @@
                     }
                 }
                 if (opts.ack !=0)
-                    msg.acknowledge(); // Cumulative ack for final batch.
+                    subs.getAckPolicy().ackOutstanding(session); // Cumulative 
ack for final batch.
                 AbsTime end=now();
 
                 // Report to publisher.

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Mon Apr 21 
07:37:03 2008
@@ -159,7 +159,7 @@
     if(!!type && StringValue("TERMINATION_REQUEST") == *type){
         shutdown();
     }else if(!!type && StringValue("REPORT_REQUEST") == *type){
-        message.acknowledge();//acknowledge everything upto this point
+        mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything 
upto this point
         cout <<"Batch ended, sending report." << endl;
         //send a report:
         report();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp?rev=650159&r1=650158&r2=650159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp Mon Apr 21 07:37:03 2008
@@ -144,7 +144,7 @@
                     
out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
                     session.messageTransfer(arg::content=out, 
arg::acceptMode=1);
                 }
-                in.acknowledge();
+                lq.getAckPolicy().ackOutstanding(session);
                 session.txCommit();
             }
         } catch(const std::exception& e) {


Reply via email to