Author: gsim
Date: Wed Oct 11 01:24:42 2006
New Revision: 462729
URL: http://svn.apache.org/viewvc?view=rev&rev=462729
Log:
Implementation of basic_get.
Added:
incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp
- copied, changed from r454649,
incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp
Removed:
incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp
incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
incubator/qpid/trunk/qpid/python/tests/basic.py
incubator/qpid/trunk/qpid/python/tests/broker.py
Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h Wed Oct 11 01:24:42 2006
@@ -60,12 +60,24 @@
Queue::shared_ptr queue;
string consumerTag;
u_int64_t deliveryTag;
+ bool pull;
- AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue,
- string _consumerTag, u_int64_t _deliveryTag) :
msg(_msg),
-
queue(_queue),
-
consumerTag(_consumerTag),
-
deliveryTag(_deliveryTag){}
+ AckRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const string _consumerTag,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+
consumerTag(_consumerTag),
+
deliveryTag(_deliveryTag),
+ pull(false){}
+
+ AckRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(""),
+
deliveryTag(_deliveryTag),
+ pull(true){}
};
typedef std::vector<AckRecord>::iterator ack_iterator;
@@ -89,12 +101,14 @@
void operator()(AckRecord& record) const;
};
- class AddSize{
+ class CalculatePrefetch{
u_int32_t size;
+ u_int16_t count;
public:
- AddSize();
+ CalculatePrefetch();
void operator()(AckRecord& record);
u_int32_t getSize();
+ u_int16_t getCount();
};
const int id;
@@ -106,6 +120,7 @@
u_int32_t prefetchSize;
u_int16_t prefetchCount;
u_int32_t outstandingSize;
+ u_int16_t outstandingCount;
u_int32_t framesize;
Message::shared_ptr message;
NameGenerator tagGenerator;
@@ -136,6 +151,7 @@
bool exists(const string& consumerTag);
void consume(string& tag, Queue::shared_ptr queue, bool acks, bool
exclusive, ConnectionToken* const connection = 0);
void cancel(const string& tag);
+ bool get(Queue::shared_ptr queue, bool ackExpected);
void begin();
void close();
void commit();
Modified: incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h Wed Oct 11 01:24:42 2006
@@ -49,6 +49,9 @@
content_list content;
u_int64_t size;
+ void sendContent(qpid::framing::OutputHandler* out,
+ int channel, u_int32_t framesize);
+
public:
typedef std::tr1::shared_ptr<Message> shared_ptr;
@@ -61,9 +64,16 @@
bool isComplete();
const ConnectionToken* const getPublisher();
- void deliver(qpid::framing::OutputHandler* out, int channel,
- string& consumerTag, u_int64_t deliveryTag,
+ void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const string& consumerTag,
+ u_int64_t deliveryTag,
u_int32_t framesize);
+ void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
void redeliver();
qpid::framing::BasicHeaderProperties* getHeaderProperties();
Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp Wed Oct 11 01:24:42
2006
@@ -31,6 +31,7 @@
prefetchCount(0),
prefetchSize(0),
outstandingSize(0),
+
outstandingCount(0),
framesize(_framesize),
transactional(false),
deliveryTag(1),
@@ -98,6 +99,7 @@
if(ackExpected){
unacknowledged.push_back(AckRecord(msg, queue, consumerTag,
myDeliveryTag));
outstandingSize += msg->contentSize();
+ outstandingCount++;
}
//send deliver method, header and content(s)
msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
@@ -162,10 +164,15 @@
throw InvalidAckException();
}else if(multiple){
unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute outstandingSize (might in some cases be quicker to add up
removed size and subtract from total?):
- outstandingSize = for_each(unacknowledged.begin(),
unacknowledged.end(), AddSize()).getSize();
+ //recompute prefetch outstanding (note: messages delivered through get
are ignored)
+ CalculatePrefetch calc(for_each(unacknowledged.begin(),
unacknowledged.end(), CalculatePrefetch()));
+ outstandingSize = calc.getSize();
+ outstandingCount = calc.getCount();
}else{
- outstandingSize -= i->msg->contentSize();
+ if(!i->pull){
+ outstandingSize -= i->msg->contentSize();
+ outstandingCount--;
+ }
unacknowledged.erase(i);
}
@@ -181,6 +188,7 @@
if(requeue){
outstandingSize = 0;
+ outstandingCount = 0;
ack_iterator start(unacknowledged.begin());
ack_iterator end(unacknowledged.end());
for_each(start, end, Requeue());
@@ -190,6 +198,21 @@
}
}
+bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Locker locker(deliveryLock);
+ u_int64_t myDeliveryTag = deliveryTag++;
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag,
framesize);
+ if(ackExpected){
+ unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
bool Channel::MatchAck::operator()(AckRecord& record) const{
@@ -204,15 +227,29 @@
Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
void Channel::Redeliver::operator()(AckRecord& record) const{
- record.msg->deliver(channel->out, channel->id, record.consumerTag,
record.deliveryTag, channel->framesize);
+ if(record.pull){
+ //if message was originally sent as response to get, we must requeue it
+ record.msg->redeliver();
+ record.queue->deliver(record.msg);
+ }else{
+ record.msg->deliver(channel->out, channel->id, record.consumerTag,
record.deliveryTag, channel->framesize);
+ }
}
-Channel::AddSize::AddSize() : size(0){}
+Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-void Channel::AddSize::operator()(AckRecord& record){
- size += record.msg->contentSize();
+void Channel::CalculatePrefetch::operator()(AckRecord& record){
+ if(!record.pull){
+ //ignore messages that were sent in response to get when calculating
prefetch
+ size += record.msg->contentSize();
+ count++;
+ }
}
-u_int32_t Channel::AddSize::getSize(){
+u_int32_t Channel::CalculatePrefetch::getSize(){
return size;
+}
+
+u_int16_t Channel::CalculatePrefetch::getCount(){
+ return count;
}
Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp Wed Oct 11 01:24:42
2006
@@ -59,10 +59,24 @@
}
void Message::deliver(OutputHandler* out, int channel,
- string& consumerTag, u_int64_t deliveryTag,
+ const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag,
deliveryTag, redelivered, exchange, routingKey)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendGetOk(OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag,
redelivered, exchange, routingKey, messageCount)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendContent(OutputHandler* out, int channel, u_int32_t
framesize){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody,
AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
for(content_iterator i = content.begin(); i != content.end(); i++){
Modified: incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp Wed Oct 11 01:24:42 2006
@@ -122,7 +122,13 @@
}
Message::shared_ptr Queue::dequeue(){
-
+ Locker locker(lock);
+ Message::shared_ptr msg;
+ if(!messages.empty()){
+ msg = messages.front();
+ messages.pop();
+ }
+ return msg;
}
u_int32_t Queue::purge(){
Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp Wed Oct 11
01:24:42 2006
@@ -338,7 +338,6 @@
void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t
prefetchSize, u_int16_t prefetchCount, bool global){
//TODO: handle global
- //TODO: channel doesn't do anything with these qos parameters yet
parent->getChannel(channel)->setPrefetchSize(prefetchSize);
parent->getChannel(channel)->setPrefetchCount(prefetchCount);
parent->client.getBasic().qosOk(channel);
@@ -349,7 +348,6 @@
bool noLocal, bool noAck,
bool exclusive,
bool nowait){
- //TODO: implement nolocal
Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
Channel* channel = parent->channels[channelId];
if(!consumerTag.empty() && channel->exists(consumerTag)){
@@ -382,7 +380,13 @@
parent->getChannel(channel)->handlePublish(msg);
}
-void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t
ticket, string& queue, bool noAck){}
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t
ticket, string& queueName, bool noAck){
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ if(!parent->getChannel(channelId)->get(queue, !noAck)){
+ string clusterId;//not used, part of an imatix hack
+ parent->client.getBasic().getEmpty(channelId, clusterId);
+ }
+}
void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t
deliveryTag, bool multiple){
try{
Copied: incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp (from r454649,
incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp?view=diff&rev=462729&p1=incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp&r1=454649&p2=incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/QueueTest.cpp Wed Oct 11 01:24:42
2006
@@ -47,12 +47,14 @@
class QueueTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(QueueTest);
- CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST(testConsumers);
+ CPPUNIT_TEST(testBinding);
+ CPPUNIT_TEST(testRegistry);
+ CPPUNIT_TEST(testDequeue);
CPPUNIT_TEST_SUITE_END();
public:
- void testMe()
- {
+ void testConsumers(){
Queue::shared_ptr queue(new Queue("my_queue", true, true));
//Test adding consumers:
@@ -82,7 +84,10 @@
CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
queue->cancel(&c2);
CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
+ }
+ void testBinding(){
+ Queue::shared_ptr queue(new Queue("my_queue", true, true));
//Test bindings:
TestBinding a;
TestBinding b;
@@ -93,7 +98,9 @@
CPPUNIT_ASSERT(a.isCancelled());
CPPUNIT_ASSERT(b.isCancelled());
+ }
+ void testRegistry(){
//Test use of queues in registry:
QueueRegistry registry;
registry.declare("queue1", true, true);
@@ -111,6 +118,40 @@
CPPUNIT_ASSERT(!registry.find("queue1"));
CPPUNIT_ASSERT(!registry.find("queue2"));
CPPUNIT_ASSERT(!registry.find("queue3"));
+ }
+
+ void testDequeue(){
+ Queue::shared_ptr queue(new Queue("my_queue", true, true));
+
+ Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e",
"A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e",
"B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e",
"C", true, true));
+ Message::shared_ptr received;
+
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount());
+
+ TestConsumer consumer;
+ queue->consume(&consumer);
+ queue->dispatch();
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT(!received);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
}
};
Modified: incubator/qpid/trunk/qpid/python/tests/basic.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/basic.py?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/basic.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/basic.py Wed Oct 11 01:24:42 2006
@@ -329,3 +329,64 @@
channel.basic_publish(routing_key="test-prefetch-size",
content=Content(large))
msg = queue.get(timeout=1)
self.assertEqual(large, msg.content.body)
+
+ def test_get(self):
+ """
+ Test basic_get method
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-get", exclusive=True)
+
+ #publish some messages (no_ack=True)
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-get",
content=Content("Message %d" % i))
+
+ #use basic_get to read back the messages, and check that we get an
empty at the end
+ for i in range(1, 11):
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ #repeat for no_ack=False
+ for i in range(11, 21):
+ channel.basic_publish(routing_key="test-get",
content=Content("Message %d" % i))
+
+ for i in range(11, 21):
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+ if(i == 13):
+ channel.basic_ack(delivery_tag=reply.delivery_tag,
multiple=True)
+ if(i in [15, 17, 19]):
+ channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ #recover(requeue=True)
+ channel.basic_recover(requeue=True)
+
+ #get the unacked messages again (14, 16, 18, 20)
+ for i in [14, 16, 18, 20]:
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+ channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ channel.basic_recover(requeue=True)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
Modified: incubator/qpid/trunk/qpid/python/tests/broker.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/broker.py?view=diff&rev=462729&r1=462728&r2=462729
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/broker.py Wed Oct 11 01:24:42 2006
@@ -45,8 +45,6 @@
msg = self.client.queue(ctag).get(timeout = 5)
ch.basic_ack(delivery_tag = msg.delivery_tag)
self.assert_(msg.content.body == body)
-
- # TODO: Ensure we get a failure if an ack consumer doesn't ack.
def test_basic_delivery_immediate(self):
"""