Author: gsim
Date: Tue Oct 10 03:06:36 2006
New Revision: 454677

URL: http://svn.apache.org/viewvc?view=rev&rev=454677
Log:
Implementation and tests for basic_qos (i.e. prefetching)


Modified:
    incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
    incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
    incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/broker/test/ChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h
    incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp
    incubator/qpid/trunk/qpid/python/qpid/   (props changed)
    incubator/qpid/trunk/qpid/python/tests/   (props changed)
    incubator/qpid/trunk/qpid/python/tests/basic.py

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h Tue Oct 10 03:06:36 2006
@@ -45,10 +45,12 @@
                 Queue::shared_ptr queue;
                 ConnectionToken* const connection;
                 const bool ackExpected;
+                bool blocked;
             public:
                 ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr 
queue, ConnectionToken* const connection, bool ack);
                 virtual bool deliver(Message::shared_ptr& msg);            
                 void cancel();
+                void requestDispatch();
             };
 
             typedef std::map<string,ConsumerImpl*>::iterator 
consumer_iterator; 
@@ -87,6 +89,14 @@
                 void operator()(AckRecord& record) const;
             };
 
+            class AddSize{
+                u_int32_t size;
+            public:
+                AddSize();
+                void operator()(AckRecord& record);
+                u_int32_t getSize();
+            };
+
             const int id;
             qpid::framing::OutputHandler* out;
             u_int64_t deliveryTag;
@@ -95,6 +105,7 @@
             std::map<string, ConsumerImpl*> consumers;
             u_int32_t prefetchSize;    
             u_int16_t prefetchCount;    
+            u_int32_t outstandingSize;    
             u_int32_t framesize;
             Message::shared_ptr message;
             NameGenerator tagGenerator;
@@ -103,12 +114,15 @@
 
             void deliver(Message::shared_ptr& msg, string& tag, 
Queue::shared_ptr& queue, bool ackExpected);            
             void checkMessage(const std::string& text);
+            bool checkPrefetch(Message::shared_ptr& msg);
+            void cancel(consumer_iterator consumer);
 
-            template<class Operation> void processMessage(Operation route){
+            template<class Operation> Operation processMessage(Operation 
route){
                 if(message->isComplete()){
                     route(message);
                     message.reset();
                 }
+                return route;
             }
 
         
@@ -119,9 +133,9 @@
             inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
             inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = 
size; }
             inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount 
= count; }
-            bool exists(string& consumerTag);
+            bool exists(const string& consumerTag);
             void consume(string& tag, Queue::shared_ptr queue, bool acks, bool 
exclusive, ConnectionToken* const connection = 0);
-            void cancel(string& tag);
+            void cancel(const string& tag);
             void begin();
             void close();
             void commit();
@@ -142,10 +156,10 @@
              * there is no content routes it using the functor passed
              * in.
              */
-            template<class Operation> void 
handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
+            template<class Operation> Operation 
handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
                 checkMessage("Invalid message sequence: got header before 
publish.");
                 message->setHeader(header);
-                processMessage(route);
+                return processMessage(route);
             }
 
             /**
@@ -153,13 +167,15 @@
              * if this completes the message, routes it using the
              * functor passed in.
              */
-            template<class Operation> void 
handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation 
route){
+            template<class Operation> Operation 
handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation 
route){
                 checkMessage("Invalid message sequence: got content before 
publish.");
                 message->addContent(content);
-                processMessage(route);
+                return processMessage(route);
             }
 
         };
+
+        struct InvalidAckException{};
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h Tue Oct 10 03:06:36 2006
@@ -47,8 +47,7 @@
             bool redelivered;
             qpid::framing::AMQHeaderBody::shared_ptr header;
             content_list content;
-
-            u_int64_t contentSize();
+            u_int64_t size;
 
         public:
             typedef std::tr1::shared_ptr<Message> shared_ptr;
@@ -70,6 +69,8 @@
             qpid::framing::BasicHeaderProperties* getHeaderProperties();
             const string& getRoutingKey() const { return routingKey; }
             const string& getExchange() const { return exchange; }
+            u_int64_t contentSize() const{ return size; }
+
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp Tue Oct 10 03:06:36 
2006
@@ -28,19 +28,18 @@
 
 Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : 
out(_out), 
                                                                        
id(_id), 
+                                                                       
prefetchCount(0),
+                                                                       
prefetchSize(0),
+                                                                       
outstandingSize(0),
                                                                        
framesize(_framesize),
                                                                        
transactional(false),
                                                                        
deliveryTag(1),
                                                                        
tagGenerator("sgen"){}
 
 Channel::~Channel(){
-    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = 
consumers.begin() ){
-        std::cout << "ERROR: Channel consumer appears not to have been 
cancelled before channel was destroyed." << std::endl;
-        delete (i->second);
-    }
 }
 
-bool Channel::exists(string& consumerTag){
+bool Channel::exists(const string& consumerTag){
     return consumers.find(consumerTag) != consumers.end();
 }
 
@@ -57,27 +56,26 @@
     }
 }
 
-void Channel::cancel(string& tag){
+void Channel::cancel(consumer_iterator i){
+    ConsumerImpl* c = i->second;
+    consumers.erase(i);
+    if(c){
+        c->cancel();
+        delete c;
+    }
+}
+
+void Channel::cancel(const string& tag){
     consumer_iterator i = consumers.find(tag);
     if(i != consumers.end()){
-        ConsumerImpl* c = i->second;
-        consumers.erase(i);
-        if(c){
-            c->cancel();
-            delete c;
-        }
+        cancel(i);
     }
 }
 
 void Channel::close(){
     //cancel all consumers
     for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = 
consumers.begin() ){
-        ConsumerImpl* c = i->second;
-        consumers.erase(i);
-        if(c){
-            c->cancel();
-            delete c;
-        }
+        cancel(i);
     }
 }
 
@@ -99,33 +97,50 @@
     u_int64_t myDeliveryTag = deliveryTag++;
     if(ackExpected){
         unacknowledged.push_back(AckRecord(msg, queue, consumerTag, 
myDeliveryTag));
+        outstandingSize += msg->contentSize();
     }
     //send deliver method, header and content(s)
     msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
 }
 
+bool Channel::checkPrefetch(Message::shared_ptr& msg){
+    Locker locker(deliveryLock);
+    bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
+    bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + 
outstandingSize || unacknowledged.empty();
+    return countOk && sizeOk;
+}
+
 Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, 
                                     Queue::shared_ptr _queue, 
                                     ConnectionToken* const _connection, bool 
ack) : parent(_parent), 
                                                                                
     tag(_tag), 
                                                                                
     queue(_queue),
                                                                                
     connection(_connection),
-                                                                               
     ackExpected(ack){
+                                                                               
     ackExpected(ack), 
+                                                                               
     blocked(false){
 }
 
 bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
-    if(connection != msg->getPublisher()){
-        parent->deliver(msg, tag, queue, ackExpected);
-        return true;
-    }else{
-        return false;
+    if(connection != msg->getPublisher()){//check for no_local
+        if(ackExpected && !parent->checkPrefetch(msg)){
+            blocked = true;
+        }else{
+            blocked = false;
+            parent->deliver(msg, tag, queue, ackExpected);
+            return true;
+        }
     }
+    return false;
 }
 
 void Channel::ConsumerImpl::cancel(){
     if(queue) queue->cancel(this);
 }
 
+void Channel::ConsumerImpl::requestDispatch(){
+    if(blocked) queue->dispatch();
+}
+
 void Channel::checkMessage(const std::string& text){
     if(!message.get()){
         THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
@@ -140,20 +155,36 @@
 }
 
 void Channel::ack(u_int64_t deliveryTag, bool multiple){
+    Locker locker(deliveryLock);//need to synchronize with possible concurrent 
delivery
+    
     ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), 
MatchAck(deliveryTag));
     if(i == unacknowledged.end()){
-        //error: how should this be signalled?
-    }else if(multiple){
+        throw InvalidAckException();
+    }else if(multiple){        
         unacknowledged.erase(unacknowledged.begin(), ++i);
+        //recompute outstandingSize (might in some cases be quicker to add up 
removed size and subtract from total?):
+        outstandingSize = for_each(unacknowledged.begin(), 
unacknowledged.end(), AddSize()).getSize();
     }else{
-        unacknowledged.erase(i);
+        outstandingSize -= i->msg->contentSize();
+        unacknowledged.erase(i);        
+    }
+
+    //if the prefetch limit had previously been reached, there may
+    //be messages that can be now be delivered
+    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i++){
+        i->second->requestDispatch();
     }
 }
 
 void Channel::recover(bool requeue){
+    Locker locker(deliveryLock);//need to synchronize with possible concurrent 
delivery
+
     if(requeue){
-        for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
-        unacknowledged.clear();
+        outstandingSize = 0;
+        ack_iterator start(unacknowledged.begin());
+        ack_iterator end(unacknowledged.end());
+        for_each(start, end, Requeue());
+        unacknowledged.erase(start, end);
     }else{
         for_each(unacknowledged.begin(), unacknowledged.end(), 
Redeliver(this));        
     }
@@ -174,4 +205,14 @@
 
 void Channel::Redeliver::operator()(AckRecord& record) const{
     record.msg->deliver(channel->out, channel->id, record.consumerTag, 
record.deliveryTag, channel->framesize);
+}
+
+Channel::AddSize::AddSize() : size(0){}
+
+void Channel::AddSize::operator()(AckRecord& record){
+    size += record.msg->contentSize();
+}
+
+u_int32_t Channel::AddSize::getSize(){
+    return size;
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp Tue Oct 10 03:06:36 
2006
@@ -33,7 +33,8 @@
                                                      routingKey(_routingKey), 
                                                      mandatory(_mandatory),
                                                      immediate(_immediate),
-                                                     redelivered(false){
+                                                     redelivered(false),
+                                                     size(0){
 
 }
 
@@ -46,6 +47,7 @@
 
 void Message::addContent(AMQContentBody::shared_ptr data){
     content.push_back(data);
+    size += data->size();    
 }
 
 bool Message::isComplete(){
@@ -76,14 +78,6 @@
 
 BasicHeaderProperties* Message::getHeaderProperties(){
     return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
-}
-
-u_int64_t Message::contentSize(){
-    u_int64_t size(0);
-    for(content_iterator i = content.begin(); i != content.end(); i++){
-        size += (*i)->size();
-    }
-    return size;
 }
 
 const ConnectionToken* const Message::getPublisher(){

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp Tue Oct 10 
03:06:36 2006
@@ -385,7 +385,11 @@
 void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t 
ticket, string& queue, bool noAck){} 
         
 void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t 
deliveryTag, bool multiple){
-    parent->getChannel(channel)->ack(deliveryTag, multiple);
+    try{
+        parent->getChannel(channel)->ack(deliveryTag, multiple);
+    }catch(InvalidAckException& e){
+        throw ConnectionException(530, "Received ack for unrecognised delivery 
tag");
+    }
 } 
         
 void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t 
deliveryTag, bool requeue){} 

Modified: incubator/qpid/trunk/qpid/cpp/broker/test/ChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/ChannelTest.cpp?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/ChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/ChannelTest.cpp Tue Oct 10 
03:06:36 2006
@@ -24,23 +24,24 @@
 #include <iostream>
 #include <memory>
 
+using namespace std::tr1;
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::concurrent;
 
-struct MessageHolder{
+struct DummyRouter{
     Message::shared_ptr last;
-};
-
-class DummyRouter{
-    MessageHolder& holder;
 
-public:
-    DummyRouter(MessageHolder& _holder) : holder(_holder){
+    void operator()(Message::shared_ptr& msg){
+       last = msg;
     }
+};
 
-    void operator()(Message::shared_ptr& msg){
-        holder.last = msg;
+struct DummyHandler : OutputHandler{
+    std::vector<AMQFrame*> frames; 
+
+    virtual void send(AMQFrame* frame){
+        frames.push_back(frame);
     }
 };
 
@@ -49,6 +50,8 @@
 {
     CPPUNIT_TEST_SUITE(ChannelTest);
     CPPUNIT_TEST(testIncoming);
+    CPPUNIT_TEST(testConsumerMgmt);
+    CPPUNIT_TEST(testDeliveryNoAck);
     CPPUNIT_TEST_SUITE_END();
 
   public:
@@ -64,18 +67,99 @@
         AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
         AMQContentBody::shared_ptr part2(new AMQContentBody(data2));        
 
-        MessageHolder holder;
-        channel.handleHeader(header, DummyRouter(holder));
-        CPPUNIT_ASSERT(!holder.last);
-        channel.handleContent(part1, DummyRouter(holder));
-        CPPUNIT_ASSERT(!holder.last);
-        channel.handleContent(part2, DummyRouter(holder));
-        CPPUNIT_ASSERT(holder.last);
-        CPPUNIT_ASSERT_EQUAL(routingKey, holder.last->getRoutingKey());
+        CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last);
+        CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last);
+        DummyRouter router = channel.handleContent(part2, DummyRouter());
+        CPPUNIT_ASSERT(router.last);
+        CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey());
+    }
+
+    void testConsumerMgmt(){
+        Queue::shared_ptr queue(new Queue("my_queue"));
+        Channel channel(0, 0, 0);
+        CPPUNIT_ASSERT(!channel.exists("my_consumer"));
+
+        ConnectionToken* owner;
+        string tag("my_consumer");
+        channel.consume(tag, queue, false, false, owner);
+        string tagA;
+        string tagB;
+        channel.consume(tagA, queue, false, false, owner);
+        channel.consume(tagB, queue, false, false, owner);
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount());
+        CPPUNIT_ASSERT(channel.exists("my_consumer"));
+        CPPUNIT_ASSERT(channel.exists(tagA));
+        CPPUNIT_ASSERT(channel.exists(tagB));
+        channel.cancel(tagA);
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount());
+        CPPUNIT_ASSERT(channel.exists("my_consumer"));
+        CPPUNIT_ASSERT(!channel.exists(tagA));
+        CPPUNIT_ASSERT(channel.exists(tagB));
+        channel.close();
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount());        
+    }
+
+    void testDeliveryNoAck(){
+        DummyHandler handler;
+        Channel channel(&handler, 7, 10000);
+
+        Message::shared_ptr msg(new Message(0, "test", "my_routing_key", 
false, false));
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(14);
+        msg->setHeader(header);
+        AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
+        msg->addContent(body);
+
+        Queue::shared_ptr queue(new Queue("my_queue"));
+        ConnectionToken* owner;
+        string tag("no_ack");
+        channel.consume(tag, queue, false, false, owner);
+
+        queue->deliver(msg);
+        CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());  
      
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());  
      
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+        BasicDeliverBody::shared_ptr 
deliver(dynamic_pointer_cast<BasicDeliverBody, 
AMQBody>(handler.frames[0]->getBody()));
+        AMQHeaderBody::shared_ptr 
contentHeader(dynamic_pointer_cast<AMQHeaderBody, 
AMQBody>(handler.frames[1]->getBody()));
+        AMQContentBody::shared_ptr 
contentBody(dynamic_pointer_cast<AMQContentBody, 
AMQBody>(handler.frames[2]->getBody()));
+        CPPUNIT_ASSERT(deliver);
+        CPPUNIT_ASSERT(contentHeader);
+        CPPUNIT_ASSERT(contentBody);
+        CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
+    }
+
+    void testDeliveryAndRecovery(){
+        DummyHandler handler;
+        Channel channel(&handler, 7, 10000);
+
+        Message::shared_ptr msg(new Message(0, "test", "my_routing_key", 
false, false));
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(14);
+        msg->setHeader(header);
+        AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
+        msg->addContent(body);
+
+        Queue::shared_ptr queue(new Queue("my_queue"));
+        ConnectionToken* owner;
+        string tag("ack");
+        channel.consume(tag, queue, true, false, owner);
+
+        queue->deliver(msg);
+        CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());  
      
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());  
      
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+        BasicDeliverBody::shared_ptr 
deliver(dynamic_pointer_cast<BasicDeliverBody, 
AMQBody>(handler.frames[0]->getBody()));
+        AMQHeaderBody::shared_ptr 
contentHeader(dynamic_pointer_cast<AMQHeaderBody, 
AMQBody>(handler.frames[1]->getBody()));
+        AMQContentBody::shared_ptr 
contentBody(dynamic_pointer_cast<AMQContentBody, 
AMQBody>(handler.frames[2]->getBody()));
+        CPPUNIT_ASSERT(deliver);
+        CPPUNIT_ASSERT(contentHeader);
+        CPPUNIT_ASSERT(contentBody);
+        CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
     }
 };
 
 // Make this test suite a plugin.
 CPPUNIT_PLUGIN_IMPLEMENT();
 CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest);
-

Modified: incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h Tue Oct 
10 03:06:36 2006
@@ -33,7 +33,7 @@
     typedef std::tr1::shared_ptr<AMQContentBody> shared_ptr;
 
     AMQContentBody();
-    AMQContentBody(string& data);
+    AMQContentBody(const string& data);
     inline virtual ~AMQContentBody(){}
     inline u_int8_t type() const { return CONTENT_BODY; };
     inline string& getData(){ return data; }

Modified: incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp Tue Oct 
10 03:06:36 2006
@@ -21,7 +21,7 @@
 qpid::framing::AMQContentBody::AMQContentBody(){
 }
 
-qpid::framing::AMQContentBody::AMQContentBody(string& _data) : data(_data){
+qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : 
data(_data){
 }
 
 u_int32_t qpid::framing::AMQContentBody::size() const{

Propchange: incubator/qpid/trunk/qpid/python/qpid/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Oct 10 03:06:36 2006
@@ -0,0 +1 @@
+*.pyc

Propchange: incubator/qpid/trunk/qpid/python/tests/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Oct 10 03:06:36 2006
@@ -0,0 +1 @@
+*.pyc

Modified: incubator/qpid/trunk/qpid/python/tests/basic.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/basic.py?view=diff&rev=454677&r1=454676&r2=454677
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/basic.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/basic.py Tue Oct 10 03:06:36 2006
@@ -113,7 +113,7 @@
         except Closed, e:
             self.assertConnectionException(530, e.args[0])
 
-    def test_basic_cancel(self):
+    def test_cancel(self):
         """
         Test compliance of the basic.cancel method
         """
@@ -139,7 +139,7 @@
         channel.basic_cancel(consumer_tag="this-never-existed")
 
 
-    def test_basic_ack(self):
+    def test_ack(self):
         """
         Test basic ack/recover behaviour
         """
@@ -183,7 +183,7 @@
             self.fail("Got unexpected message: " + extra.content.body)
         except Empty: None
 
-    def test_basic_recover_requeue(self):
+    def test_recover_requeue(self):
         """
         Test requeing on recovery
         """
@@ -238,3 +238,94 @@
             self.fail("Got unexpected message in original queue: " + 
extra.content.body)
         except Empty: None
         
+        
+    def test_qos_prefetch_count(self):
+        """
+        Test that the prefetch count specified is honoured
+        """
+        #setup: declare queue and subscribe
+        channel = self.channel
+        channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+        subscription = channel.basic_consume(queue="test-prefetch-count", 
no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        #set prefetch to 5:
+        channel.basic_qos(prefetch_count=5)
+
+        #publish 10 messages:
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-prefetch-count", 
content=Content("Message %d" % i))
+
+        #only 5 messages should have been delivered:
+        for i in range(1, 6):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 6th message in original queue: " + 
extra.content.body)
+        except Empty: None
+
+        #ack messages and check that the next set arrive ok:
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        for i in range(6, 11):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 11th message in original queue: " + 
extra.content.body)
+        except Empty: None
+
+
+        
+    def test_qos_prefetch_size(self):
+        """
+        Test that the prefetch size specified is honoured
+        """
+        #setup: declare queue and subscribe
+        channel = self.channel
+        channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+        subscription = channel.basic_consume(queue="test-prefetch-size", 
no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+        channel.basic_qos(prefetch_size=50)
+
+        #publish 10 messages:
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-prefetch-size", 
content=Content("Message %d" % i))
+
+        #only 5 messages should have been delivered (i.e. 45 bytes worth):
+        for i in range(1, 6):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 6th message in original queue: " + 
extra.content.body)
+        except Empty: None
+
+        #ack messages and check that the next set arrive ok:
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        for i in range(6, 11):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 11th message in original queue: " + 
extra.content.body)
+        except Empty: None
+
+        #make sure that a single oversized message still gets delivered
+        large = "abcdefghijklmnopqrstuvwxyz"
+        large = large + "-" + large;
+        channel.basic_publish(routing_key="test-prefetch-size", 
content=Content(large))
+        msg = queue.get(timeout=1)
+        self.assertEqual(large, msg.content.body)


Reply via email to