Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageBuilderTest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageBuilderTest.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageBuilderTest.cpp Tue Apr 22 05:37:29 2008 @@ -22,6 +22,7 @@ #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/framing/frame_functors.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/TypeFilter.h" #include "qpid_test_plugin.h" #include <list> @@ -101,7 +102,7 @@ std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0); @@ -124,7 +125,7 @@ std::string exchange("builder-exchange"); std::string key("builder-exchange"); - AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>(data)); method.setEof(false); @@ -158,7 +159,7 @@ std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2)); @@ -194,7 +195,7 @@ std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2));
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp Tue Apr 22 05:37:29 2008 @@ -21,7 +21,9 @@ #include "qpid/broker/Message.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" +#include "qpid/framing/Uuid.h" #include "qpid_test_plugin.h" @@ -44,14 +46,14 @@ { string exchange = "MyExchange"; string routingKey = "MyRoutingKey"; - string messageId = "MyMessage"; + Uuid messageId(true); string data1("abcdefg"); string data2("hijklmn"); intrusive_ptr<Message> msg(new Message()); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2)); Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageUtils.h URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageUtils.h?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageUtils.h (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageUtils.h Tue Apr 22 05:37:29 2008 @@ -22,6 +22,8 @@ #include "qpid/broker/Message.h" #include "qpid/broker/MessageDelivery.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/Uuid.h" using namespace qpid; using namespace broker; @@ -29,12 +31,12 @@ struct MessageUtils { - static boost::intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey, - const string& messageId, uint64_t contentSize = 0) + static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="", + const Uuid& messageId=Uuid(true), uint64_t contentSize = 0) { boost::intrusive_ptr<Message> msg(new Message()); - AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); msg->getFrames().append(method); Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp Tue Apr 22 05:37:29 2008 @@ -23,6 +23,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid_test_plugin.h" #include <iostream> #include "boost/format.hpp" @@ -48,6 +49,7 @@ return true; }; void notify() {} + OwnershipToken* getSession() { return 0; } }; class FailOnDeliver : public Deliverable @@ -75,7 +77,7 @@ intrusive_ptr<Message> message(std::string exchange, std::string routingKey) { intrusive_ptr<Message> msg(new Message()); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); msg->getFrames().append(method); msg->getFrames().append(header); Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp Tue Apr 22 05:37:29 2008 @@ -18,6 +18,7 @@ * under the License. * */ +#include "MessageUtils.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManager.h" #include "qpid/broker/TxAck.h" @@ -69,14 +70,8 @@ TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) { for(int i = 0; i < 10; i++){ - intrusive_ptr<Message> msg(new Message()); - AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, "exchange", 0, 0)); - AMQFrame header(in_place<AMQHeaderBody>()); - msg->getFrames().append(method); - msg->getFrames().append(header); + intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", "routing_key")); msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); - msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key"); messages.push_back(msg); QueuedMessage qm(queue.get()); qm.payload = msg; Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/client_test.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/client_test.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/client_test.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/client_test.cpp Tue Apr 22 05:37:29 2008 @@ -33,12 +33,11 @@ #include "qpid/client/Message.h" #include "qpid/client/Session.h" #include "qpid/framing/FrameSet.h" -#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid; using namespace qpid::client; -using qpid::framing::FrameSet; -using qpid::framing::MessageTransferBody; +using namespace qpid::framing; using std::string; struct Args : public qpid::TestOptions { @@ -104,14 +103,14 @@ if (opts.trace) std::cout << "Declared queue." << std::endl; //now bind the queue to the exchange - session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::routingKey="MyKey"); + session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey"); if (opts.trace) std::cout << "Bound queue to exchange." << std::endl; //create and send a message to the exchange using the routing //key we bound our queue with: Message msgOut(generateData(opts.msgSize)); msgOut.getDeliveryProperties().setRoutingKey("MyKey"); - session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut); + session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1); if (opts.trace) print("Published message: ", msgOut); //subscribe to the queue, add sufficient credit and then get @@ -125,13 +124,16 @@ if (opts.trace) std::cout << "Subscribed to queue." << std::endl; FrameSet::shared_ptr incoming = session.get(); if (incoming->isA<MessageTransferBody>()) { - Message msgIn(*incoming, session); + Message msgIn(*incoming); if (msgIn.getData() == msgOut.getData()) { if (opts.trace) std::cout << "Received the exepected message." << std::endl; - msgIn.acknowledge(); + session.messageAccept(SequenceSet(msgIn.getId())); + session.markCompleted(msgIn.getId(), true, true); } else { print("Received an unexepected message: ", msgIn); } + } else { + throw Exception("Unexpected command received"); } //close the session & connection Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/exception_test.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/exception_test.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/exception_test.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/exception_test.cpp Tue Apr 22 05:37:29 2008 @@ -96,7 +96,8 @@ QPID_AUTO_TEST_CASE(NoSuchQueueTest) { ProxySessionFixture fix; - BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue").sync(), NotFoundException); + fix.session.setSynchronous(true); + BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException); } QPID_AUTO_TEST_SUITE_END() Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/interop_runner.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/interop_runner.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/interop_runner.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/interop_runner.cpp Tue Apr 22 05:37:29 2008 @@ -203,7 +203,7 @@ bool Listener::invite(const string& name) { TestMap::iterator i = tests.find(name); - test = (i != tests.end()) ? qpid::ptr_map::get_pointer(i) : 0; + test = (i != tests.end()) ? qpid::ptr_map_ptr(i) : 0; return test; } Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/latencytest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/latencytest.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/latencytest.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/latencytest.cpp Tue Apr 22 05:37:29 2008 @@ -192,7 +192,7 @@ mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2))); mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true); } else { - mgr.setConfirmMode(false); + mgr.setAcceptMode(1/*not-required*/); mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); } mgr.subscribe(*this, queue); @@ -257,14 +257,13 @@ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } - Completion c; for (uint i = 0; i < opts.count; i++) { uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - c = session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } - c.sync(); + session.sync(); } void Sender::sendByRate() @@ -283,7 +282,7 @@ uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } uint64_t timeTaken = (current_time() - start) / TIME_USEC; if (timeTaken < 1000) { Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp Tue Apr 22 05:37:29 2008 @@ -210,7 +210,8 @@ void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) { session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings); - session.queuePurge(arg::queue=name).sync(); + session.queuePurge(arg::queue=name); + session.sync(); } void run() { @@ -334,7 +335,7 @@ << endl; Message msg(data, queue); for (size_t i = 0; i < n; ++i) - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } void run() { // Controller @@ -421,7 +422,6 @@ } void run() { // Publisher - Completion completion; try { string data; size_t offset(0); @@ -459,19 +459,19 @@ // any heap allocation. const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t), reinterpret_cast<const char*>(&i), sizeof(uint32_t)); - completion = session.messageTransfer( + session.messageTransfer( arg::destination=destination, arg::content=msg, - arg::confirmMode=opts.confirm); - if (opts.intervalPub) ::usleep(opts.intervalPub*1000); + arg::acceptMode=1); + if (opts.intervalPub) ::usleep(opts.intervalPub*1000); } - if (opts.confirm) completion.sync(); + if (opts.confirm) session.sync(); AbsTime end=now(); double time=secs(start,end); // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); - session.messageTransfer(arg::content=report); + session.messageTransfer(arg::content=report, arg::acceptMode=1); } session.close(); } @@ -496,9 +496,9 @@ arg::exclusive=true, arg::autoDelete=true, arg::durable=opts.durable); - session.queueBind(arg::queue=queue, - arg::exchange=ex, - arg::routingKey=key); + session.exchangeBind(arg::queue=queue, + arg::exchange=ex, + arg::bindingKey=key); } void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) { @@ -506,7 +506,7 @@ Message error( QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual), "sub_done"); - session.messageTransfer(arg::content=error); + session.messageTransfer(arg::content=error, arg::acceptMode=1); throw Exception(error.getData()); } } @@ -515,12 +515,12 @@ try { SubscriptionManager subs(session); LocalQueue lq(AckPolicy(opts.ack)); - subs.setConfirmMode(opts.ack > 0); + subs.setAcceptMode(opts.ack > 0 ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. - session.messageTransfer(arg::content=Message("ready", "sub_ready")); + session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); for (size_t j = 0; j < opts.iterations; ++j) { @@ -533,9 +533,9 @@ size_t expect=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); - if (opts.intervalSub) ::usleep(opts.intervalSub*1000); + if (opts.intervalSub) ::usleep(opts.intervalSub*1000); // TODO aconway 2007-11-23: check message order for. - // multiple publishers. Need an acorray of counters, + // multiple publishers. Need an array of counters, // one per publisher and a publisher ID in the // message. Careful not to introduce a lot of overhead // here, e.g. no std::map, std::string etc. @@ -550,13 +550,13 @@ } } if (opts.ack !=0) - msg.acknowledge(); // Cumulative ack for final batch. + subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. AbsTime end=now(); // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); - session.messageTransfer(arg::content=result); + session.messageTransfer(arg::content=result, arg::acceptMode=1); } session.close(); } Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_listener.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_listener.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_listener.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_listener.cpp Tue Apr 22 05:37:29 2008 @@ -114,7 +114,7 @@ } else { session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true); } - session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control"); + session.exchangeBind(arg::exchange="amq.topic", arg::queue=control, arg::bindingKey="topic_control"); //set up listener SubscriptionManager mgr(session); @@ -123,7 +123,7 @@ mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2))); mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true); } else { - mgr.setConfirmMode(false); + mgr.setAcceptMode(1/*-not-required*/); mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); } mgr.subscribe(listener, control); @@ -159,7 +159,7 @@ if(!!type && StringValue("TERMINATION_REQUEST") == *type){ shutdown(); }else if(!!type && StringValue("REPORT_REQUEST") == *type){ - message.acknowledge();//acknowledge everything upto this point + mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point cout <<"Batch ended, sending report." << endl; //send a report: report(); @@ -181,7 +181,7 @@ << time/TIME_MSEC << " ms."; Message msg(reportstr.str(), responseQueue); msg.getHeaders().setString("TYPE", "REPORT"); - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); if(transactional){ session.txCommit(); } Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_publisher.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_publisher.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_publisher.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/topic_publisher.cpp Tue Apr 22 05:37:29 2008 @@ -164,12 +164,12 @@ AbsTime start = now(); for(int i = 0; i < msgs; i++){ - session.messageTransfer(arg::content=msg, arg::destination="amq.topic"); + session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1); } //send report request Message reportRequest("", controlTopic); reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic"); + session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1); if(transactional){ session.txCommit(); } @@ -198,7 +198,7 @@ //send termination request Message terminationRequest("", controlTopic); terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic"); + session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1); if(transactional){ session.txCommit(); } Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/txtest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/txtest.cpp?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/txtest.cpp (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/txtest.cpp Tue Apr 22 05:37:29 2008 @@ -142,9 +142,9 @@ out.setData(in.getData()); out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId()); out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); - session.messageTransfer(arg::content=out); + session.messageTransfer(arg::content=out, arg::acceptMode=1); } - in.acknowledge(); + lq.getAckPolicy().ackOutstanding(session); session.txCommit(); } } catch(const std::exception& e) { @@ -168,7 +168,8 @@ { //declare queues for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { - session.queueDeclare(arg::queue=*i, arg::durable=opts.durable).sync(); + session.queueDeclare(arg::queue=*i, arg::durable=opts.durable); + session.sync(); } Message msg(generateData(opts.size), *queues.begin()); @@ -179,7 +180,7 @@ //publish messages for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { msg.getMessageProperties().setCorrelationId(*i); - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } } @@ -205,7 +206,7 @@ { SubscriptionManager subs(session); subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); - subs.setConfirmMode(false); + subs.setAcceptMode(1/*not-required*/); StringSet drained; //drain each queue and verify the correct set of messages are available @@ -213,7 +214,8 @@ //subscribe, allocate credit and flush LocalQueue lq(AckPolicy(0));//manual acking subs.subscribe(lq, *i, *i); - session.messageFlush(arg::destination=*i).sync(); + session.messageFlush(arg::destination=*i); + session.sync(); uint count(0); while (!lq.empty()) { Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/xml/extra.xml URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/xml/extra.xml?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/cpp/xml/extra.xml (original) +++ incubator/qpid/branches/thegreatmerge/qpid/cpp/xml/extra.xml Tue Apr 22 05:37:29 2008 @@ -623,7 +623,7 @@ <class name="message010" index="4"> <doc>blah, blah</doc> - <method name = "transfer" index="1"> + <method name = "transfer" content="1" index="1"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> <chassis name="client" implement="MUST" /> @@ -818,7 +818,7 @@ <method name = "declare" index="1"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="exchange" domain="shortstr"/> <field name="type" domain="shortstr"/> <field name="alternate-exchange" domain="shortstr"/> <field name="passive" domain="bit"/> @@ -829,7 +829,7 @@ <method name = "delete" index="2"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="exchange" domain="shortstr"/> <field name="if-unused" domain="bit"/> </method> <method name = "query" index="3"> @@ -863,8 +863,8 @@ <method name = "bound" index="6"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="queue" domain="shortstr"/> <field name="exchange" domain="shortstr"/> + <field name="queue" domain="shortstr"/> <field name="binding-key" domain="shortstr"/> <field name="arguments" domain="table"/> <result> @@ -884,7 +884,7 @@ <method name = "declare" index="1"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="queue" domain="shortstr"/> <field name="alternate-exchange" domain="shortstr"/> <field name="passive" domain="bit"/> <field name="durable" domain="bit"/> @@ -895,25 +895,25 @@ <method name = "delete" index="2"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="queue" domain="shortstr"/> <field name="if-unused" domain="bit"/> <field name="if-empty" domain="bit"/> </method> <method name = "purge" index="3"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="queue" domain="shortstr"/> </method> <method name = "query" index="4"> <doc>blah, blah</doc> <chassis name="server" implement="MUST" /> - <field name="name" domain="shortstr"/> + <field name="queue" domain="shortstr"/> <result> <struct size="long" type="1"> <field name="name" domain="shortstr"/> <field name="alternate-exchange" domain="shortstr"/> - <field name="passive" domain="bit"/> <field name="durable" domain="bit"/> + <field name="exclusive" domain="bit"/> <field name="auto-delete" domain="bit"/> <field name="arguments" domain="table"/> <field name="message-count" domain="long"/> Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Tue Apr 22 05:37:29 2008 @@ -148,7 +148,7 @@ } props.setExpiration(devprop.getExpiration()); UUID mid = mprop.getMessageId(); - props.setMessageId(mid == null ? null : mid.toString()); + props.setMessageId(mid == null ? null : "ID:" + mid.toString()); if (devprop.hasPriority()) { props.setPriority((byte) devprop.getPriority().getValue()); Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/message.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/message.py?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/message.py (original) +++ incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/message.py Tue Apr 22 05:37:29 2008 @@ -92,6 +92,34 @@ #check queue is empty self.assertEqual(0, session.queue_query(queue="test-queue").message_count) + def test_no_local_exclusive_subscribe(self): + """ + Test that the no_local flag is honoured in the consume method + """ + session = self.session + + #setup, declare two queues one of which excludes delivery of + #locally sent messages but is not declared as exclusive + session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'}) + #establish two consumers + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True) + + #send a message + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) + + #check the queues of the two consumers + excluded = session.incoming("local_excluded") + included = session.incoming("local_included") + msg = included.get(timeout=1) + self.assertEqual("deliver-me", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + def test_consume_exclusive(self): """ Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py?rev=650478&r1=650477&r2=650478&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py (original) +++ incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py Tue Apr 22 05:37:29 2008 @@ -223,7 +223,7 @@ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b")) session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c")) session.queue_delete(queue="delete-me") - #check that it has gone be declaring passively + #check that it has gone by declaring passively try: session.queue_declare(queue="delete-me", passive=True) self.fail("Queue has not been deleted")
