Author: gsim
Date: Wed Oct 11 01:24:42 2006
New Revision: 462729

URL: http://svn.apache.org/viewvc?view=rev&rev=462729
Log:
Implementation of basic_get.


Added:
    incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp
      - copied, changed from r454649, 
incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp
Removed:
    incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
    incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
    incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/python/tests/basic.py
    incubator/qpid/trunk/qpid/python/tests/broker.py

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h Wed Oct 11 01:24:42 2006
@@ -60,12 +60,24 @@
                 Queue::shared_ptr queue;
                 string consumerTag;
                 u_int64_t deliveryTag;
+                bool pull;
 
-                AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue, 
-                          string _consumerTag, u_int64_t _deliveryTag) : 
msg(_msg), 
-                                                                        
queue(_queue), 
-                                                                        
consumerTag(_consumerTag),
-                                                                        
deliveryTag(_deliveryTag){}
+                AckRecord(Message::shared_ptr _msg, 
+                          Queue::shared_ptr _queue, 
+                          const string _consumerTag, 
+                          const u_int64_t _deliveryTag) : msg(_msg), 
+                                                          queue(_queue), 
+                                                          
consumerTag(_consumerTag),
+                                                          
deliveryTag(_deliveryTag),
+                                                          pull(false){}
+
+                AckRecord(Message::shared_ptr _msg, 
+                          Queue::shared_ptr _queue, 
+                          const u_int64_t _deliveryTag) : msg(_msg), 
+                                                          queue(_queue), 
+                                                          consumerTag(""),
+                                                          
deliveryTag(_deliveryTag),
+                                                          pull(true){}
             };
 
             typedef std::vector<AckRecord>::iterator ack_iterator; 
@@ -89,12 +101,14 @@
                 void operator()(AckRecord& record) const;
             };
 
-            class AddSize{
+            class CalculatePrefetch{
                 u_int32_t size;
+                u_int16_t count;
             public:
-                AddSize();
+                CalculatePrefetch();
                 void operator()(AckRecord& record);
                 u_int32_t getSize();
+                u_int16_t getCount();
             };
 
             const int id;
@@ -106,6 +120,7 @@
             u_int32_t prefetchSize;    
             u_int16_t prefetchCount;    
             u_int32_t outstandingSize;    
+            u_int16_t outstandingCount;    
             u_int32_t framesize;
             Message::shared_ptr message;
             NameGenerator tagGenerator;
@@ -136,6 +151,7 @@
             bool exists(const string& consumerTag);
             void consume(string& tag, Queue::shared_ptr queue, bool acks, bool 
exclusive, ConnectionToken* const connection = 0);
             void cancel(const string& tag);
+            bool get(Queue::shared_ptr queue, bool ackExpected);
             void begin();
             void close();
             void commit();

Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h Wed Oct 11 01:24:42 2006
@@ -49,6 +49,9 @@
             content_list content;
             u_int64_t size;
 
+            void sendContent(qpid::framing::OutputHandler* out, 
+                             int channel, u_int32_t framesize);
+
         public:
             typedef std::tr1::shared_ptr<Message> shared_ptr;
 
@@ -61,9 +64,16 @@
             bool isComplete();
             const ConnectionToken* const getPublisher();
 
-            void deliver(qpid::framing::OutputHandler* out, int channel, 
-                         string& consumerTag, u_int64_t deliveryTag, 
+            void deliver(qpid::framing::OutputHandler* out, 
+                         int channel, 
+                         const string& consumerTag, 
+                         u_int64_t deliveryTag, 
                          u_int32_t framesize);
+            void sendGetOk(qpid::framing::OutputHandler* out, 
+                           int channel, 
+                           u_int32_t messageCount,
+                           u_int64_t deliveryTag, 
+                           u_int32_t framesize);
             void redeliver();
 
             qpid::framing::BasicHeaderProperties* getHeaderProperties();

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp Wed Oct 11 01:24:42 
2006
@@ -31,6 +31,7 @@
                                                                        
prefetchCount(0),
                                                                        
prefetchSize(0),
                                                                        
outstandingSize(0),
+                                                                       
outstandingCount(0),
                                                                        
framesize(_framesize),
                                                                        
transactional(false),
                                                                        
deliveryTag(1),
@@ -98,6 +99,7 @@
     if(ackExpected){
         unacknowledged.push_back(AckRecord(msg, queue, consumerTag, 
myDeliveryTag));
         outstandingSize += msg->contentSize();
+        outstandingCount++;
     }
     //send deliver method, header and content(s)
     msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
@@ -162,10 +164,15 @@
         throw InvalidAckException();
     }else if(multiple){        
         unacknowledged.erase(unacknowledged.begin(), ++i);
-        //recompute outstandingSize (might in some cases be quicker to add up 
removed size and subtract from total?):
-        outstandingSize = for_each(unacknowledged.begin(), 
unacknowledged.end(), AddSize()).getSize();
+        //recompute prefetch outstanding (note: messages delivered through get 
are ignored)
+        CalculatePrefetch calc(for_each(unacknowledged.begin(), 
unacknowledged.end(), CalculatePrefetch()));
+        outstandingSize = calc.getSize();
+        outstandingCount = calc.getCount();
     }else{
-        outstandingSize -= i->msg->contentSize();
+        if(!i->pull){
+            outstandingSize -= i->msg->contentSize();
+            outstandingCount--;
+        }
         unacknowledged.erase(i);        
     }
 
@@ -181,6 +188,7 @@
 
     if(requeue){
         outstandingSize = 0;
+        outstandingCount = 0;
         ack_iterator start(unacknowledged.begin());
         ack_iterator end(unacknowledged.end());
         for_each(start, end, Requeue());
@@ -190,6 +198,21 @@
     }
 }
 
+bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+    Message::shared_ptr msg = queue->dequeue();
+    if(msg){
+        Locker locker(deliveryLock);
+        u_int64_t myDeliveryTag = deliveryTag++;
+        msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, 
framesize);
+        if(ackExpected){
+            unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+        }
+        return true;
+    }else{
+        return false;
+    }
+}
+
 Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
 
 bool Channel::MatchAck::operator()(AckRecord& record) const{
@@ -204,15 +227,29 @@
 Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
 
 void Channel::Redeliver::operator()(AckRecord& record) const{
-    record.msg->deliver(channel->out, channel->id, record.consumerTag, 
record.deliveryTag, channel->framesize);
+    if(record.pull){
+        //if message was originally sent as response to get, we must requeue it
+        record.msg->redeliver();
+        record.queue->deliver(record.msg);
+    }else{
+        record.msg->deliver(channel->out, channel->id, record.consumerTag, 
record.deliveryTag, channel->framesize);
+    }
 }
 
-Channel::AddSize::AddSize() : size(0){}
+Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
 
-void Channel::AddSize::operator()(AckRecord& record){
-    size += record.msg->contentSize();
+void Channel::CalculatePrefetch::operator()(AckRecord& record){
+    if(!record.pull){
+        //ignore messages that were sent in response to get when calculating 
prefetch
+        size += record.msg->contentSize();
+        count++;
+    }
 }
 
-u_int32_t Channel::AddSize::getSize(){
+u_int32_t Channel::CalculatePrefetch::getSize(){
     return size;
+}
+
+u_int16_t Channel::CalculatePrefetch::getCount(){
+    return count;
 }

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp Wed Oct 11 01:24:42 
2006
@@ -59,10 +59,24 @@
 }
 
 void Message::deliver(OutputHandler* out, int channel, 
-                      string& consumerTag, u_int64_t deliveryTag, 
+                      const string& consumerTag, u_int64_t deliveryTag, 
                       u_int32_t framesize){
 
     out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, 
deliveryTag, redelivered, exchange, routingKey)));
+    sendContent(out, channel, framesize);
+}
+
+void Message::sendGetOk(OutputHandler* out, 
+         int channel, 
+         u_int32_t messageCount,
+         u_int64_t deliveryTag, 
+         u_int32_t framesize){
+
+    out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, 
redelivered, exchange, routingKey, messageCount)));
+    sendContent(out, channel, framesize);
+}
+
+void Message::sendContent(OutputHandler* out, int channel, u_int32_t 
framesize){
     AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, 
AMQHeaderBody>(header);
     out->send(new AMQFrame(channel, headerBody));
     for(content_iterator i = content.begin(); i != content.end(); i++){

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp Wed Oct 11 01:24:42 2006
@@ -122,7 +122,13 @@
 }
 
 Message::shared_ptr Queue::dequeue(){
-
+    Locker locker(lock);
+    Message::shared_ptr msg;
+    if(!messages.empty()){
+        msg = messages.front();
+        messages.pop();
+    }
+    return msg;
 }
 
 u_int32_t Queue::purge(){

Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp Wed Oct 11 
01:24:42 2006
@@ -338,7 +338,6 @@
 
 void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t 
prefetchSize, u_int16_t prefetchCount, bool global){
     //TODO: handle global
-    //TODO: channel doesn't do anything with these qos parameters yet
     parent->getChannel(channel)->setPrefetchSize(prefetchSize);
     parent->getChannel(channel)->setPrefetchCount(prefetchCount);
     parent->client.getBasic().qosOk(channel);
@@ -349,7 +348,6 @@
                                                    bool noLocal, bool noAck, 
bool exclusive, 
                                                    bool nowait){
     
-    //TODO: implement nolocal
     Queue::shared_ptr queue = parent->getQueue(queueName, channelId);    
     Channel* channel = parent->channels[channelId];
     if(!consumerTag.empty() && channel->exists(consumerTag)){
@@ -382,7 +380,13 @@
     parent->getChannel(channel)->handlePublish(msg);
 } 
         
-void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t 
ticket, string& queue, bool noAck){} 
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t 
ticket, string& queueName, bool noAck){
+    Queue::shared_ptr queue = parent->getQueue(queueName, channelId);    
+    if(!parent->getChannel(channelId)->get(queue, !noAck)){
+        string clusterId;//not used, part of an imatix hack
+        parent->client.getBasic().getEmpty(channelId, clusterId);
+    }
+} 
         
 void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t 
deliveryTag, bool multiple){
     try{

Copied: incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp (from r454649, 
incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp?view=diff&rev=462729&p1=incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp&r1=454649&p2=incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp Wed Oct 11 01:24:42 
2006
@@ -47,12 +47,14 @@
 class QueueTest : public CppUnit::TestCase  
 {
     CPPUNIT_TEST_SUITE(QueueTest);
-    CPPUNIT_TEST(testMe);
+    CPPUNIT_TEST(testConsumers);
+    CPPUNIT_TEST(testBinding);
+    CPPUNIT_TEST(testRegistry);
+    CPPUNIT_TEST(testDequeue);
     CPPUNIT_TEST_SUITE_END();
 
   public:
-    void testMe() 
-    {
+    void testConsumers(){
         Queue::shared_ptr queue(new Queue("my_queue", true, true));
     
         //Test adding consumers:
@@ -82,7 +84,10 @@
         CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
         queue->cancel(&c2);
         CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
+    }
 
+    void testBinding(){
+        Queue::shared_ptr queue(new Queue("my_queue", true, true));
         //Test bindings:
         TestBinding a;
         TestBinding b;
@@ -93,7 +98,9 @@
 
         CPPUNIT_ASSERT(a.isCancelled());
         CPPUNIT_ASSERT(b.isCancelled());
+    }
 
+    void testRegistry(){
         //Test use of queues in registry:
         QueueRegistry registry;
         registry.declare("queue1", true, true);
@@ -111,6 +118,40 @@
         CPPUNIT_ASSERT(!registry.find("queue1"));
         CPPUNIT_ASSERT(!registry.find("queue2"));
         CPPUNIT_ASSERT(!registry.find("queue3"));
+    }
+
+    void testDequeue(){
+        Queue::shared_ptr queue(new Queue("my_queue", true, true));
+
+        Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", 
"A", true, true));
+        Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", 
"B", true, true));
+        Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", 
"C", true, true));
+        Message::shared_ptr received;
+
+        queue->deliver(msg1);
+        queue->deliver(msg2);
+        queue->deliver(msg3);
+
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount());
+        
+        received = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount());
+
+        received = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount());
+
+        TestConsumer consumer; 
+        queue->consume(&consumer);
+        queue->dispatch();
+        CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+        received = queue->dequeue();
+        CPPUNIT_ASSERT(!received);
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+        
     }
 };
 

Modified: incubator/qpid/trunk/qpid/python/tests/basic.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/basic.py?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/basic.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/basic.py Wed Oct 11 01:24:42 2006
@@ -329,3 +329,64 @@
         channel.basic_publish(routing_key="test-prefetch-size", 
content=Content(large))
         msg = queue.get(timeout=1)
         self.assertEqual(large, msg.content.body)
+
+    def test_get(self):
+        """
+        Test basic_get method
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-get", exclusive=True)
+        
+        #publish some messages (no_ack=True)
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-get", 
content=Content("Message %d" % i))
+
+        #use basic_get to read back the messages, and check that we get an 
empty at the end
+        for i in range(1, 11):
+            reply = channel.basic_get(no_ack=True)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get-empty")
+
+        #repeat for no_ack=False
+        for i in range(11, 21):
+            channel.basic_publish(routing_key="test-get", 
content=Content("Message %d" % i))
+
+        for i in range(11, 21):
+            reply = channel.basic_get(no_ack=False)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+            if(i == 13):
+                channel.basic_ack(delivery_tag=reply.delivery_tag, 
multiple=True)
+            if(i in [15, 17, 19]):
+                channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get-empty")
+
+        #recover(requeue=True)
+        channel.basic_recover(requeue=True)
+        
+        #get the unacked messages again (14, 16, 18, 20)
+        for i in [14, 16, 18, 20]:
+            reply = channel.basic_get(no_ack=False)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+            channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get-empty")
+
+        channel.basic_recover(requeue=True)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get-empty")

Modified: incubator/qpid/trunk/qpid/python/tests/broker.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/broker.py?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/broker.py Wed Oct 11 01:24:42 2006
@@ -45,8 +45,6 @@
         msg = self.client.queue(ctag).get(timeout = 5)
         ch.basic_ack(delivery_tag = msg.delivery_tag)
         self.assert_(msg.content.body == body)
-
-        # TODO: Ensure we get a failure if an ack consumer doesn't ack.
         
     def test_basic_delivery_immediate(self):
         """


Reply via email to