Author: gsim
Date: Tue Oct 31 10:33:40 2006
New Revision: 469599
URL: http://svn.apache.org/viewvc?view=rev&rev=469599
Log:
Hid locking within exchange registry, switched to shared_ptr for exchanges,
added some extra error handling and tests.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
incubator/qpid/trunk/qpid/python/tests/exchange.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Tue Oct 31
10:33:40 2006
@@ -155,7 +155,7 @@
if(blocked) queue->dispatch();
}
-void Channel::handlePublish(Message* _message, Exchange* _exchange){
+void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
Message::shared_ptr message(_message);
exchange = _exchange;
messageBuilder.initialise(message);
@@ -179,7 +179,7 @@
DeliverableMessage deliverable(msg);
exchange->route(deliverable, msg->getRoutingKey(),
&(msg->getHeaderProperties()->getHeaders()));
}
- exchange = 0;
+ exchange.reset();
}else{
std::cout << "Exchange not known in
Channel::complete(Message::shared_ptr&)" << std::endl;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h Tue Oct 31 10:33:40
2006
@@ -82,7 +82,7 @@
AccumulatedAck accumulatedAck;
TransactionalStore* store;
MessageBuilder messageBuilder;//builder for in-progress message
- Exchange* exchange;//exchange to which any in-progress message was
published to
+ Exchange::shared_ptr exchange;//exchange to which any in-progress
message was published to
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, string& tag,
Queue::shared_ptr& queue, bool ackExpected);
@@ -107,7 +107,7 @@
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag,
u_int64_t deliveryTag);
- void handlePublish(Message* msg, Exchange* exchange);
+ void handlePublish(Message* msg, Exchange::shared_ptr exchange);
void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void handleContent(qpid::framing::AMQContentBody::shared_ptr
content);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Tue Oct 31
10:33:40 2006
@@ -36,6 +36,8 @@
static const std::string typeName;
DirectExchange(const std::string& name);
+
+ virtual std::string getType(){ return typeName; }
virtual void bind(Queue::shared_ptr queue, const std::string&
routingKey, qpid::framing::FieldTable* args);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Tue Oct 31
10:33:40 2006
@@ -18,23 +18,27 @@
#ifndef _Exchange_
#define _Exchange_
+#include <boost/shared_ptr.hpp>
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
namespace qpid {
-namespace broker {
- class Exchange{
- const std::string name;
- public:
- explicit Exchange(const std::string& _name) : name(_name) {}
- virtual ~Exchange(){}
- std::string getName() { return name; }
- virtual void bind(Queue::shared_ptr queue, const string& routingKey,
qpid::framing::FieldTable* args) = 0;
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey,
qpid::framing::FieldTable* args) = 0;
- virtual void route(Deliverable& msg, const string& routingKey,
qpid::framing::FieldTable* args) = 0;
- };
-}
+ namespace broker {
+ class Exchange{
+ const std::string name;
+ public:
+ typedef boost::shared_ptr<Exchange> shared_ptr;
+
+ explicit Exchange(const std::string& _name) : name(_name){}
+ virtual ~Exchange(){}
+ std::string getName() { return name; }
+ virtual std::string getType() = 0;
+ virtual void bind(Queue::shared_ptr queue, const string&
routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void unbind(Queue::shared_ptr queue, const string&
routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg, const string& routingKey,
qpid::framing::FieldTable* args) = 0;
+ };
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue Oct
31 10:33:40 2006
@@ -16,33 +16,46 @@
*
*/
#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/HeadersExchange.h"
+#include "qpid/broker/TopicExchange.h"
using namespace qpid::broker;
using namespace qpid::concurrent;
+using std::pair;
-ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
-
-ExchangeRegistry::~ExchangeRegistry(){
- for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end();
++i)
- {
- delete i->second;
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name,
const string& type) throw(UnknownExchangeTypeException){
+ Locker locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i == exchanges.end()) {
+ Exchange::shared_ptr exchange;
+
+ if(type == TopicExchange::typeName){
+ exchange = Exchange::shared_ptr(new TopicExchange(name));
+ }else if(type == DirectExchange::typeName){
+ exchange = Exchange::shared_ptr(new DirectExchange(name));
+ }else if(type == FanOutExchange::typeName){
+ exchange = Exchange::shared_ptr(new FanOutExchange(name));
+ }else if (type == HeadersExchange::typeName) {
+ exchange = Exchange::shared_ptr(new HeadersExchange(name));
+ }else{
+ throw UnknownExchangeTypeException();
+ }
+ exchanges[name] = exchange;
+ return std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ } else {
+ return std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
- delete lock;
-}
-
-void ExchangeRegistry::declare(Exchange* exchange){
- exchanges[exchange->getName()] = exchange;
}
void ExchangeRegistry::destroy(const string& name){
- if(exchanges[name]){
- delete exchanges[name];
- exchanges.erase(name);
- }
+ Locker locker(lock);
+ exchanges.erase(name);
}
-Exchange* ExchangeRegistry::get(const string& name){
+Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+ Locker locker(lock);
return exchanges[name];
}
@@ -51,7 +64,7 @@
const std::string empty;
}
-Exchange* ExchangeRegistry::getDefault()
+Exchange::shared_ptr ExchangeRegistry::getDefault()
{
return get(empty);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Tue Oct 31
10:33:40 2006
@@ -20,22 +20,21 @@
#include <map>
#include "qpid/broker/Exchange.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/MonitorImpl.h"
namespace qpid {
namespace broker {
+ struct UnknownExchangeTypeException{};
+
class ExchangeRegistry{
- typedef std::map<string, Exchange*> ExchangeMap;
+ typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::concurrent::Monitor* lock;
+ qpid::concurrent::MonitorImpl lock;
public:
- ExchangeRegistry();
- void declare(Exchange* exchange);
+ std::pair<Exchange::shared_ptr, bool> declare(const string& name,
const string& type) throw(UnknownExchangeTypeException);
void destroy(const string& name);
- Exchange* get(const string& name);
- Exchange* getDefault();
- inline qpid::concurrent::Monitor* getLock(){ return lock; }
- ~ExchangeRegistry();
+ Exchange::shared_ptr get(const string& name);
+ Exchange::shared_ptr getDefault();
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Tue Oct 31
10:33:40 2006
@@ -37,6 +37,8 @@
static const std::string typeName;
FanOutExchange(const std::string& name);
+
+ virtual std::string getType(){ return typeName; }
virtual void bind(Queue::shared_ptr queue, const std::string& routingKey,
qpid::framing::FieldTable* args);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Tue Oct 31
10:33:40 2006
@@ -40,6 +40,8 @@
static const std::string typeName;
HeadersExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
virtual void bind(Queue::shared_ptr queue, const string& routingKey,
qpid::framing::FieldTable* args);
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
Tue Oct 31 10:33:40 2006
@@ -33,11 +33,11 @@
}
SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) :
timeout(_timeout), cleaner(&queues, timeout/10){
- exchanges.declare(new DirectExchange(empty)); // Default exchange.
- exchanges.declare(new DirectExchange(amq_direct));
- exchanges.declare(new TopicExchange(amq_topic));
- exchanges.declare(new FanOutExchange(amq_fanout));
- exchanges.declare(new HeadersExchange(amq_match));
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ exchanges.declare(amq_direct, DirectExchange::typeName);
+ exchanges.declare(amq_topic, TopicExchange::typeName);
+ exchanges.declare(amq_fanout, FanOutExchange::typeName);
+ exchanges.declare(amq_match, HeadersExchange::typeName);
cleaner.start();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp Tue
Oct 31 10:33:40 2006
@@ -73,11 +73,8 @@
}
-Exchange* SessionHandlerImpl::findExchange(const string& name){
- exchanges->getLock()->acquire();
- Exchange* exchange(exchanges->get(name));
- exchanges->getLock()->release();
- return exchange;
+Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
+ return exchanges->get(name);
}
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
@@ -217,40 +214,31 @@
bool passive, bool
/*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
FieldTable&
/*arguments*/){
- if(!passive && (
- type != TopicExchange::typeName &&
- type != DirectExchange::typeName &&
- type != FanOutExchange::typeName &&
- type != HeadersExchange::typeName
- )
- )
- {
- throw ChannelException(540, "Exchange type not implemented: " + type);
- }
-
- parent->exchanges->getLock()->acquire();
- if(!parent->exchanges->get(exchange)){
- if(type == TopicExchange::typeName){
- parent->exchanges->declare(new TopicExchange(exchange));
- }else if(type == DirectExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if(type == FanOutExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if (type == HeadersExchange::typeName) {
- parent->exchanges->declare(new HeadersExchange(exchange));
+ if(passive){
+ if(!parent->exchanges->get(exchange)){
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response =
parent->exchanges->declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(507, "Exchange already declared to
be of type "
+ + response.first->getType() + ",
requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(503, "Exchange type not implemented: " +
type);
}
}
- parent->exchanges->getLock()->release();
+
if(!nowait){
parent->client.getExchange().declareOk(channel);
}
}
-void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel,
u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel,
u_int16_t /*ticket*/,
+ string& exchange, bool
/*ifUnused*/, bool nowait){
//TODO: implement unused
- parent->exchanges->getLock()->acquire();
parent->exchanges->destroy(exchange);
- parent->exchanges->getLock()->release();
if(!nowait) parent->client.getExchange().deleteOk(channel);
}
@@ -290,7 +278,7 @@
FieldTable& arguments){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange* exchange = parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
if(exchange){
if(routingKey.empty() && queueName.empty()) routingKey =
queue->getName();
exchange->bind(queue, routingKey, &arguments);
@@ -371,7 +359,7 @@
string& exchangeName,
string& routingKey,
bool mandatory, bool
immediate){
- Exchange* exchange = exchangeName.empty() ?
parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = exchangeName.empty() ?
parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
if(exchange){
Message* msg = new Message(parent, exchangeName, routingKey,
mandatory, immediate);
parent->getChannel(channel)->handlePublish(msg, exchange);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h Tue Oct
31 10:33:40 2006
@@ -97,7 +97,7 @@
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
- Exchange* findExchange(const string& name);
+ Exchange::shared_ptr findExchange(const string& name);
public:
SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry*
queues,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Tue Oct 31
10:33:40 2006
@@ -77,6 +77,8 @@
static const std::string typeName;
TopicExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
virtual void bind(Queue::shared_ptr queue, const string& routingKey,
qpid::framing::FieldTable* args);
Modified: incubator/qpid/trunk/qpid/python/tests/exchange.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/exchange.py?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/exchange.py Tue Oct 31 10:33:40 2006
@@ -23,6 +23,7 @@
import Queue, logging
from qpid.testlib import TestBase
from qpid.content import Content
+from qpid.client import Closed
class StandardExchangeVerifier:
@@ -207,10 +208,14 @@
class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
"""
If set, and the exchange does not already exist, the server MUST raise a
- channel exception with reply code 404 (not found).
-
-
+ channel exception with reply code 404 (not found).
"""
+ def test(self):
+ try:
+ self.channel.exchange_declare(exchange="humpty_dumpty",
passive=True)
+ self.fail("Expected 404 for passive declaration of unknown
exchange.")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
class DeclareMethodDurableFieldSupportRuleTests(TestBase):
@@ -292,3 +297,28 @@
self.myBasicPublish({"irrelevant":0})
self.assertEmpty(self.q)
+
+class MiscellaneousErrorsTests(TestBase):
+ """
+ Test some miscellaneous error conditions
+ """
+ def testTypeNotKnown(self):
+ try:
+
self.channel.exchange_declare(exchange="test_type_not_known_exchange",
type="invalid_type")
+ self.fail("Expected 503 for declaration of unknown exchange type.")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def testDifferentDeclaredType(self):
+
self.channel.exchange_declare(exchange="test_different_declared_type_exchange",
type="direct")
+ try:
+
self.channel.exchange_declare(exchange="test_different_declared_type_exchange",
type="topic")
+ self.fail("Expected 507 for redeclaration of exchange with
different type.")
+ except Closed, e:
+ self.assertConnectionException(507, e.args[0])
+ #cleanup
+ other = self.connect()
+ c2 = other.channel(1)
+ c2.channel_open()
+ c2.exchange_delete(exchange="test_different_declared_type_exchange")
+