Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageBuilderTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageBuilderTest.cpp 
(original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageBuilderTest.cpp 
Tue Apr 22 05:37:29 2008
@@ -22,6 +22,7 @@
 #include "qpid/broker/MessageBuilder.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/framing/frame_functors.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/TypeFilter.h"
 #include "qpid_test_plugin.h"
 #include <list>
@@ -101,7 +102,7 @@
         std::string key("builder-exchange");
 
         AMQFrame method(in_place<MessageTransferBody>(
-                            ProtocolVersion(), 0, exchange, 0, 0));
+                            ProtocolVersion(), exchange, 0, 0));
         AMQFrame header(in_place<AMQHeaderBody>());
 
         
header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0);
        
@@ -124,7 +125,7 @@
         std::string exchange("builder-exchange");
         std::string key("builder-exchange");
 
-        AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, 
exchange, 0, 0));
+        AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 
exchange, 0, 0));
         AMQFrame header(in_place<AMQHeaderBody>());
         AMQFrame content(in_place<AMQContentBody>(data));
         method.setEof(false);
@@ -158,7 +159,7 @@
         std::string key("builder-exchange");
 
         AMQFrame method(in_place<MessageTransferBody>(
-                            ProtocolVersion(), 0, exchange, 0, 0));
+                            ProtocolVersion(), exchange, 0, 0));
         AMQFrame header(in_place<AMQHeaderBody>());
         AMQFrame content1(in_place<AMQContentBody>(data1));
         AMQFrame content2(in_place<AMQContentBody>(data2));
@@ -194,7 +195,7 @@
         std::string key("builder-exchange");
 
         AMQFrame method(in_place<MessageTransferBody>(
-                            ProtocolVersion(), 0, exchange, 0, 0));
+                            ProtocolVersion(), exchange, 0, 0));
         AMQFrame header(in_place<AMQHeaderBody>());
         AMQFrame content1(in_place<AMQContentBody>(data1));
         AMQFrame content2(in_place<AMQContentBody>(data2));

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp 
Tue Apr 22 05:37:29 2008
@@ -21,7 +21,9 @@
 #include "qpid/broker/Message.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/FieldValue.h"
+#include "qpid/framing/Uuid.h"
 
 #include "qpid_test_plugin.h"
 
@@ -44,14 +46,14 @@
     {
         string exchange = "MyExchange";
         string routingKey = "MyRoutingKey";
-        string messageId = "MyMessage";
+        Uuid messageId(true);
         string data1("abcdefg");
         string data2("hijklmn");
 
         intrusive_ptr<Message> msg(new Message());
 
         AMQFrame method(in_place<MessageTransferBody>(
-                            ProtocolVersion(), 0, exchange, 0, 0));
+                            ProtocolVersion(), exchange, 0, 0));
         AMQFrame header(in_place<AMQHeaderBody>());
         AMQFrame content1(in_place<AMQContentBody>(data1));
         AMQFrame content2(in_place<AMQContentBody>(data2));

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageUtils.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageUtils.h?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageUtils.h 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageUtils.h Tue 
Apr 22 05:37:29 2008
@@ -22,6 +22,8 @@
 #include "qpid/broker/Message.h"
 #include "qpid/broker/MessageDelivery.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/Uuid.h"
 
 using namespace qpid;
 using namespace broker;
@@ -29,12 +31,12 @@
 
 struct MessageUtils
 {
-    static boost::intrusive_ptr<Message> createMessage(const string& exchange, 
const string& routingKey, 
-                                             const string& messageId, uint64_t 
contentSize = 0)
+    static boost::intrusive_ptr<Message> createMessage(const string& 
exchange="", const string& routingKey="", 
+                                                       const Uuid& 
messageId=Uuid(true), uint64_t contentSize = 0)
     {
         boost::intrusive_ptr<Message> msg(new Message());
 
-        AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, 
exchange, 0, 0));
+        AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 
exchange, 0, 0));
         AMQFrame header(in_place<AMQHeaderBody>());
 
         msg->getFrames().append(method);

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp Tue 
Apr 22 05:37:29 2008
@@ -23,6 +23,7 @@
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid_test_plugin.h"
 #include <iostream>
 #include "boost/format.hpp"
@@ -48,6 +49,7 @@
         return true;
     };
     void notify() {}
+    OwnershipToken* getSession() { return 0; }
 };
 
 class FailOnDeliver : public Deliverable
@@ -75,7 +77,7 @@
     intrusive_ptr<Message> message(std::string exchange, std::string 
routingKey) {
         intrusive_ptr<Message> msg(new Message());
         AMQFrame method(in_place<MessageTransferBody>(
-                            ProtocolVersion(), 0, exchange, 0, 0));
+                            ProtocolVersion(), exchange, 0, 0));
         AMQFrame header(in_place<AMQHeaderBody>());
         msg->getFrames().append(method);
         msg->getFrames().append(header);

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp Tue 
Apr 22 05:37:29 2008
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "MessageUtils.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/RecoveryManager.h"
 #include "qpid/broker/TxAck.h"
@@ -69,14 +70,8 @@
     TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), 
op(acked, deliveries)
     {
         for(int i = 0; i < 10; i++){
-            intrusive_ptr<Message> msg(new Message());
-            AMQFrame method(in_place<MessageTransferBody>(
-                                ProtocolVersion(), 0, "exchange", 0, 0));
-            AMQFrame header(in_place<AMQHeaderBody>());
-            msg->getFrames().append(method);
-            msg->getFrames().append(header);
+            intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", 
"routing_key"));
             
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
-            
msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key");
             messages.push_back(msg);
             QueuedMessage qm(queue.get());
             qm.payload = msg;

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/client_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/client_test.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/client_test.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/client_test.cpp 
Tue Apr 22 05:37:29 2008
@@ -33,12 +33,11 @@
 #include "qpid/client/Message.h"
 #include "qpid/client/Session.h"
 #include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/all_method_bodies.h"
 
 using namespace qpid;
 using namespace qpid::client;
-using qpid::framing::FrameSet;
-using qpid::framing::MessageTransferBody;
+using namespace qpid::framing;
 using std::string;
 
 struct Args : public qpid::TestOptions {
@@ -104,14 +103,14 @@
        if (opts.trace) std::cout << "Declared queue." << std::endl;
 
         //now bind the queue to the exchange
-       session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", 
arg::routingKey="MyKey");
+       session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", 
arg::bindingKey="MyKey");
        if (opts.trace) std::cout << "Bound queue to exchange." << std::endl;
 
         //create and send a message to the exchange using the routing
         //key we bound our queue with:
        Message msgOut(generateData(opts.msgSize));
         msgOut.getDeliveryProperties().setRoutingKey("MyKey");
-        session.messageTransfer(arg::destination="MyExchange", 
arg::content=msgOut);
+        session.messageTransfer(arg::destination="MyExchange", 
arg::content=msgOut, arg::acceptMode=1);
        if (opts.trace) print("Published message: ", msgOut);
 
         //subscribe to the queue, add sufficient credit and then get
@@ -125,13 +124,16 @@
        if (opts.trace) std::cout << "Subscribed to queue." << std::endl;
         FrameSet::shared_ptr incoming = session.get();
         if (incoming->isA<MessageTransferBody>()) {
-            Message msgIn(*incoming, session);
+            Message msgIn(*incoming);
             if (msgIn.getData() == msgOut.getData()) {
                 if (opts.trace) std::cout << "Received the exepected message." 
<< std::endl;
-                msgIn.acknowledge();
+                session.messageAccept(SequenceSet(msgIn.getId()));
+                session.markCompleted(msgIn.getId(), true, true);
             } else {
                 print("Received an unexepected message: ", msgIn);
             }
+        } else {
+            throw Exception("Unexpected command received");
         }
         
         //close the session & connection

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/exception_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/exception_test.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/exception_test.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/exception_test.cpp 
Tue Apr 22 05:37:29 2008
@@ -96,7 +96,8 @@
 
 QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
     ProxySessionFixture fix;
-    BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue").sync(), 
NotFoundException);
+    fix.session.setSynchronous(true);
+    BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), 
NotFoundException);
 }
 
 QPID_AUTO_TEST_SUITE_END()

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/interop_runner.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/interop_runner.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/interop_runner.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/interop_runner.cpp 
Tue Apr 22 05:37:29 2008
@@ -203,7 +203,7 @@
 bool Listener::invite(const string& name)
 {
     TestMap::iterator i = tests.find(name);
-    test = (i != tests.end()) ? qpid::ptr_map::get_pointer(i) : 0;
+    test = (i != tests.end()) ? qpid::ptr_map_ptr(i) : 0;
     return test;
 }
 

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/latencytest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/latencytest.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/latencytest.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/latencytest.cpp 
Tue Apr 22 05:37:29 2008
@@ -192,7 +192,7 @@
         mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2)));
         mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, 
true);
     } else {
-        mgr.setConfirmMode(false);
+        mgr.setAcceptMode(1/*not-required*/);
         mgr.setFlowControl(SubscriptionManager::UNLIMITED, 
SubscriptionManager::UNLIMITED, false);
     }
     mgr.subscribe(*this, queue);    
@@ -257,14 +257,13 @@
         msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
     }
 
-    Completion c;
     for (uint i = 0; i < opts.count; i++) {
         uint64_t sentAt(current_time());
         msg.getDeliveryProperties().setTimestamp(sentAt);
         //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support 
for uint64_t to field tables
-        c = session.messageTransfer(arg::content=msg);
+        session.messageTransfer(arg::content=msg, arg::acceptMode=1);
     }
-    c.sync();
+    session.sync();
 }
 
 void Sender::sendByRate()
@@ -283,7 +282,7 @@
             uint64_t sentAt(current_time());
             msg.getDeliveryProperties().setTimestamp(sentAt);
             //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add 
support for uint64_t to field tables
-            session.messageTransfer(arg::content=msg);
+            session.messageTransfer(arg::content=msg, arg::acceptMode=1);
         }
         uint64_t timeTaken = (current_time() - start) / TIME_USEC;
         if (timeTaken < 1000) {

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp Tue 
Apr 22 05:37:29 2008
@@ -210,7 +210,8 @@
     
     void queueInit(string name, bool durable=false, const framing::FieldTable& 
settings=framing::FieldTable()) {
         session.queueDeclare(arg::queue=name, arg::durable=durable, 
arg::arguments=settings);
-        session.queuePurge(arg::queue=name).sync();
+        session.queuePurge(arg::queue=name);
+        session.sync();
     }
 
     void run() {
@@ -334,7 +335,7 @@
                  << endl;
         Message msg(data, queue);
         for (size_t i = 0; i < n; ++i) 
-            session.messageTransfer(arg::content=msg);
+            session.messageTransfer(arg::content=msg, arg::acceptMode=1);
     }
 
     void run() {                // Controller
@@ -421,7 +422,6 @@
     }
     
     void run() {                // Publisher
-        Completion completion;
         try {
             string data;
             size_t offset(0);
@@ -459,19 +459,19 @@
                     // any heap allocation.
                     const_cast<std::string&>(msg.getData()).replace(offset, 
sizeof(uint32_t), 
                                                                     
reinterpret_cast<const char*>(&i), sizeof(uint32_t));
-                    completion = session.messageTransfer(
+                    session.messageTransfer(
                         arg::destination=destination,
                         arg::content=msg,
-                        arg::confirmMode=opts.confirm);
-                           if (opts.intervalPub) 
::usleep(opts.intervalPub*1000);
+                        arg::acceptMode=1);
+                    if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
                 }
-                if (opts.confirm) completion.sync();
+                if (opts.confirm) session.sync();
                 AbsTime end=now();
                 double time=secs(start,end);
                 
                 // Send result to controller.
                 Message report(lexical_cast<string>(opts.count/time), 
"pub_done");
-                session.messageTransfer(arg::content=report);
+                session.messageTransfer(arg::content=report, 
arg::acceptMode=1);
             }
             session.close();
         }
@@ -496,9 +496,9 @@
                              arg::exclusive=true,
                              arg::autoDelete=true,
                              arg::durable=opts.durable);
-        session.queueBind(arg::queue=queue,
-                          arg::exchange=ex,
-                          arg::routingKey=key);
+        session.exchangeBind(arg::queue=queue,
+                             arg::exchange=ex,
+                             arg::bindingKey=key);
     }
 
     void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) 
{
@@ -506,7 +506,7 @@
             Message error(
                 QPID_MSG("Sequence error: expected  n" << test << expect << " 
but got " << actual),
                 "sub_done");
-            session.messageTransfer(arg::content=error);
+            session.messageTransfer(arg::content=error, arg::acceptMode=1);
             throw Exception(error.getData());
         }
     }
@@ -515,12 +515,12 @@
         try {
             SubscriptionManager subs(session);
             LocalQueue lq(AckPolicy(opts.ack));
-            subs.setConfirmMode(opts.ack > 0);
+            subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
             subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
                                 false);
             subs.subscribe(lq, queue);
             // Notify controller we are ready.
-            session.messageTransfer(arg::content=Message("ready", 
"sub_ready"));
+            session.messageTransfer(arg::content=Message("ready", 
"sub_ready"), arg::acceptMode=1);
 
             
             for (size_t j = 0; j < opts.iterations; ++j) {
@@ -533,9 +533,9 @@
                 size_t expect=0;
                 for (size_t i = 0; i < opts.subQuota; ++i) {
                     msg=lq.pop();
-                           if (opts.intervalSub) 
::usleep(opts.intervalSub*1000);
+                    if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
                     // TODO aconway 2007-11-23: check message order for. 
-                    // multiple publishers. Need an acorray of counters,
+                    // multiple publishers. Need an array of counters,
                     // one per publisher and a publisher ID in the
                     // message. Careful not to introduce a lot of overhead
                     // here, e.g. no std::map, std::string etc.
@@ -550,13 +550,13 @@
                     }
                 }
                 if (opts.ack !=0)
-                    msg.acknowledge(); // Cumulative ack for final batch.
+                    subs.getAckPolicy().ackOutstanding(session); // Cumulative 
ack for final batch.
                 AbsTime end=now();
 
                 // Report to publisher.
                 Message 
result(lexical_cast<string>(opts.subQuota/secs(start,end)),
                                "sub_done");
-                session.messageTransfer(arg::content=result);
+                session.messageTransfer(arg::content=result, 
arg::acceptMode=1);
             }
             session.close();
         }

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_listener.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_listener.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_listener.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_listener.cpp 
Tue Apr 22 05:37:29 2008
@@ -114,7 +114,7 @@
             } else {
                 session.queueDeclare(arg::queue=control, arg::exclusive=true, 
arg::autoDelete=true);
             }
-            session.queueBind(arg::exchange="amq.topic", arg::queue=control, 
arg::routingKey="topic_control");
+            session.exchangeBind(arg::exchange="amq.topic", 
arg::queue=control, arg::bindingKey="topic_control");
 
             //set up listener
             SubscriptionManager mgr(session);
@@ -123,7 +123,7 @@
                 mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : 
(args.prefetch / 2)));
                 mgr.setFlowControl(args.prefetch, 
SubscriptionManager::UNLIMITED, true);
             } else {
-                mgr.setConfirmMode(false);
+                mgr.setAcceptMode(1/*-not-required*/);
                 mgr.setFlowControl(SubscriptionManager::UNLIMITED, 
SubscriptionManager::UNLIMITED, false);
             }
             mgr.subscribe(listener, control);
@@ -159,7 +159,7 @@
     if(!!type && StringValue("TERMINATION_REQUEST") == *type){
         shutdown();
     }else if(!!type && StringValue("REPORT_REQUEST") == *type){
-        message.acknowledge();//acknowledge everything upto this point
+        mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything 
upto this point
         cout <<"Batch ended, sending report." << endl;
         //send a report:
         report();
@@ -181,7 +181,7 @@
               << time/TIME_MSEC << " ms.";
     Message msg(reportstr.str(), responseQueue);
     msg.getHeaders().setString("TYPE", "REPORT");
-    session.messageTransfer(arg::content=msg);
+    session.messageTransfer(arg::content=msg, arg::acceptMode=1);
     if(transactional){
         session.txCommit();
     }

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_publisher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_publisher.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_publisher.cpp 
(original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_publisher.cpp 
Tue Apr 22 05:37:29 2008
@@ -164,12 +164,12 @@
     AbsTime start = now();
     
     for(int i = 0; i < msgs; i++){
-        session.messageTransfer(arg::content=msg, 
arg::destination="amq.topic");
+        session.messageTransfer(arg::content=msg, 
arg::destination="amq.topic", arg::acceptMode=1);
     }
     //send report request
     Message reportRequest("", controlTopic);
     reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
-    session.messageTransfer(arg::content=reportRequest, 
arg::destination="amq.topic");
+    session.messageTransfer(arg::content=reportRequest, 
arg::destination="amq.topic", arg::acceptMode=1);
     if(transactional){
         session.txCommit();
     }    
@@ -198,7 +198,7 @@
     //send termination request
     Message terminationRequest("", controlTopic);
     terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
-    session.messageTransfer(arg::content=terminationRequest, 
arg::destination="amq.topic");
+    session.messageTransfer(arg::content=terminationRequest, 
arg::destination="amq.topic", arg::acceptMode=1);
     if(transactional){
         session.txCommit();
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/txtest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/txtest.cpp?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/txtest.cpp 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/txtest.cpp Tue Apr 
22 05:37:29 2008
@@ -142,9 +142,9 @@
                     out.setData(in.getData());
                     
out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId());
                     
out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
-                    session.messageTransfer(arg::content=out);
+                    session.messageTransfer(arg::content=out, 
arg::acceptMode=1);
                 }
-                in.acknowledge();
+                lq.getAckPolicy().ackOutstanding(session);
                 session.txCommit();
             }
         } catch(const std::exception& e) {
@@ -168,7 +168,8 @@
     {
         //declare queues
         for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
-            session.queueDeclare(arg::queue=*i, 
arg::durable=opts.durable).sync();
+            session.queueDeclare(arg::queue=*i, arg::durable=opts.durable);
+            session.sync();
         }
 
         Message msg(generateData(opts.size), *queues.begin());
@@ -179,7 +180,7 @@
         //publish messages
         for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) {
             msg.getMessageProperties().setCorrelationId(*i);
-            session.messageTransfer(arg::content=msg);
+            session.messageTransfer(arg::content=msg, arg::acceptMode=1);
         }
     }
 
@@ -205,7 +206,7 @@
     {
         SubscriptionManager subs(session);
         subs.setFlowControl(SubscriptionManager::UNLIMITED, 
SubscriptionManager::UNLIMITED, false);
-        subs.setConfirmMode(false);
+        subs.setAcceptMode(1/*not-required*/);
 
         StringSet drained;
         //drain each queue and verify the correct set of messages are available
@@ -213,7 +214,8 @@
             //subscribe, allocate credit and flush
             LocalQueue lq(AckPolicy(0));//manual acking
             subs.subscribe(lq, *i, *i);
-            session.messageFlush(arg::destination=*i).sync();
+            session.messageFlush(arg::destination=*i);
+            session.sync();
 
             uint count(0);
             while (!lq.empty()) {

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/xml/extra.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/xml/extra.xml?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/xml/extra.xml Tue Apr 22 
05:37:29 2008
@@ -623,7 +623,7 @@
 
 <class name="message010" index="4">
     <doc>blah, blah</doc>
-    <method name = "transfer" index="1">
+    <method name = "transfer" content="1" index="1">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
         <chassis name="client" implement="MUST" />
@@ -818,7 +818,7 @@
     <method name = "declare" index="1">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
-        <field name="name" domain="shortstr"/>
+        <field name="exchange" domain="shortstr"/>
         <field name="type" domain="shortstr"/>
         <field name="alternate-exchange" domain="shortstr"/>
         <field name="passive" domain="bit"/>
@@ -829,7 +829,7 @@
     <method name = "delete" index="2">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
-        <field name="name" domain="shortstr"/>
+        <field name="exchange" domain="shortstr"/>
         <field name="if-unused" domain="bit"/>
     </method>
     <method name = "query" index="3">
@@ -863,8 +863,8 @@
     <method name = "bound" index="6">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
-        <field name="queue" domain="shortstr"/>
         <field name="exchange" domain="shortstr"/>
+        <field name="queue" domain="shortstr"/>
         <field name="binding-key" domain="shortstr"/>
         <field name="arguments" domain="table"/>
         <result>
@@ -884,7 +884,7 @@
     <method name = "declare" index="1">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
-        <field name="name" domain="shortstr"/>
+        <field name="queue" domain="shortstr"/>
         <field name="alternate-exchange" domain="shortstr"/>
         <field name="passive" domain="bit"/>
         <field name="durable" domain="bit"/>
@@ -895,25 +895,25 @@
     <method name = "delete" index="2">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
-        <field name="name" domain="shortstr"/>
+        <field name="queue" domain="shortstr"/>
         <field name="if-unused" domain="bit"/>
         <field name="if-empty" domain="bit"/>
     </method>
     <method name = "purge" index="3">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
-        <field name="name" domain="shortstr"/>
+        <field name="queue" domain="shortstr"/>
     </method>
     <method name = "query" index="4">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
-        <field name="name" domain="shortstr"/>
+        <field name="queue" domain="shortstr"/>
         <result>
             <struct size="long" type="1">
                 <field name="name" domain="shortstr"/>
                 <field name="alternate-exchange" domain="shortstr"/>
-                <field name="passive" domain="bit"/>
                 <field name="durable" domain="bit"/>
+                <field name="exclusive" domain="bit"/>
                 <field name="auto-delete" domain="bit"/>
                 <field name="arguments" domain="table"/>
                 <field name="message-count" domain="long"/>

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 Tue Apr 22 05:37:29 2008
@@ -148,7 +148,7 @@
         }
         props.setExpiration(devprop.getExpiration());
         UUID mid = mprop.getMessageId();
-        props.setMessageId(mid == null ? null : mid.toString());
+        props.setMessageId(mid == null ? null : "ID:" + mid.toString());
         if (devprop.hasPriority())
         {
             props.setPriority((byte) devprop.getPriority().getValue());

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/message.py?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/message.py 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/message.py Tue 
Apr 22 05:37:29 2008
@@ -92,6 +92,34 @@
         #check queue is empty
         self.assertEqual(0, 
session.queue_query(queue="test-queue").message_count)
 
+    def test_no_local_exclusive_subscribe(self):
+        """
+        Test that the no_local flag is honoured in the consume method
+        """
+        session = self.session
+
+        #setup, declare two queues one of which excludes delivery of
+        #locally sent messages but is not declared as exclusive
+        session.queue_declare(queue="test-queue-1a", exclusive=True, 
auto_delete=True)
+        session.queue_declare(queue="test-queue-1b", auto_delete=True, 
arguments={'no-local':'true'})
+        #establish two consumers 
+        self.subscribe(destination="local_included", queue="test-queue-1a")
+        self.subscribe(destination="local_excluded", queue="test-queue-1b", 
exclusive=True)
+
+        #send a message
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"),
 "deliver-me"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"),
 "dont-deliver-me"))
+
+        #check the queues of the two consumers
+        excluded = session.incoming("local_excluded")
+        included = session.incoming("local_included")
+        msg = included.get(timeout=1)
+        self.assertEqual("deliver-me", msg.body)
+        try:
+            excluded.get(timeout=1)
+            self.fail("Received locally published message though 
no_local=true")
+        except Empty: None
+
 
     def test_consume_exclusive(self):
         """

Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py?rev=650478&r1=650477&r2=650478&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py Tue 
Apr 22 05:37:29 2008
@@ -223,7 +223,7 @@
         
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
 "b"))
         
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
 "c"))
         session.queue_delete(queue="delete-me")
-        #check that it has gone be declaring passively
+        #check that it has gone by declaring passively
         try:
             session.queue_declare(queue="delete-me", passive=True)
             self.fail("Queue has not been deleted")


Reply via email to