Author: gsim
Date: Tue Oct 31 10:33:40 2006
New Revision: 469599

URL: http://svn.apache.org/viewvc?view=rev&rev=469599
Log:
Hid locking within exchange registry, switched to shared_ptr for exchanges, 
added some extra error handling and tests.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
    incubator/qpid/trunk/qpid/python/tests/exchange.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Tue Oct 31 
10:33:40 2006
@@ -155,7 +155,7 @@
     if(blocked) queue->dispatch();
 }
 
-void Channel::handlePublish(Message* _message, Exchange* _exchange){
+void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
     Message::shared_ptr message(_message);
     exchange = _exchange;
     messageBuilder.initialise(message);
@@ -179,7 +179,7 @@
             DeliverableMessage deliverable(msg);
             exchange->route(deliverable, msg->getRoutingKey(), 
&(msg->getHeaderProperties()->getHeaders()));
         }
-        exchange = 0;
+        exchange.reset();
     }else{
         std::cout << "Exchange not known in 
Channel::complete(Message::shared_ptr&)" << std::endl;
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h Tue Oct 31 10:33:40 
2006
@@ -82,7 +82,7 @@
             AccumulatedAck accumulatedAck;
             TransactionalStore* store;
             MessageBuilder messageBuilder;//builder for in-progress message
-            Exchange* exchange;//exchange to which any in-progress message was 
published to
+            Exchange::shared_ptr exchange;//exchange to which any in-progress 
message was published to
 
             virtual void complete(Message::shared_ptr& msg);
             void deliver(Message::shared_ptr& msg, string& tag, 
Queue::shared_ptr& queue, bool ackExpected);            
@@ -107,7 +107,7 @@
             void ack(u_int64_t deliveryTag, bool multiple);
             void recover(bool requeue);
             void deliver(Message::shared_ptr& msg, const string& consumerTag, 
u_int64_t deliveryTag);            
-            void handlePublish(Message* msg, Exchange* exchange);
+            void handlePublish(Message* msg, Exchange::shared_ptr exchange);
             void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
             void handleContent(qpid::framing::AMQContentBody::shared_ptr 
content);
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Tue Oct 31 
10:33:40 2006
@@ -36,6 +36,8 @@
         static const std::string typeName;
         
         DirectExchange(const std::string& name);
+
+        virtual std::string getType(){ return typeName; }            
         
         virtual void bind(Queue::shared_ptr queue, const std::string& 
routingKey, qpid::framing::FieldTable* args);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Tue Oct 31 
10:33:40 2006
@@ -18,23 +18,27 @@
 #ifndef _Exchange_
 #define _Exchange_
 
+#include <boost/shared_ptr.hpp>
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/FieldTable.h"
 
 namespace qpid {
-namespace broker {
-    class Exchange{
-        const std::string name;
-      public:
-        explicit Exchange(const std::string& _name) : name(_name) {}
-        virtual ~Exchange(){}
-        std::string getName() { return name; }
-        virtual void bind(Queue::shared_ptr queue, const string& routingKey, 
qpid::framing::FieldTable* args) = 0;
-        virtual void unbind(Queue::shared_ptr queue, const string& routingKey, 
qpid::framing::FieldTable* args) = 0;
-        virtual void route(Deliverable& msg, const string& routingKey, 
qpid::framing::FieldTable* args) = 0;
-    };
-}
+    namespace broker {
+        class Exchange{
+            const std::string name;
+        public:
+            typedef boost::shared_ptr<Exchange> shared_ptr;
+
+            explicit Exchange(const std::string& _name) : name(_name){}
+            virtual ~Exchange(){}
+            std::string getName() { return name; }
+            virtual std::string getType() = 0;
+            virtual void bind(Queue::shared_ptr queue, const string& 
routingKey, qpid::framing::FieldTable* args) = 0;
+            virtual void unbind(Queue::shared_ptr queue, const string& 
routingKey, qpid::framing::FieldTable* args) = 0;
+            virtual void route(Deliverable& msg, const string& routingKey, 
qpid::framing::FieldTable* args) = 0;
+        };
+    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue Oct 
31 10:33:40 2006
@@ -16,33 +16,46 @@
  *
  */
 #include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/HeadersExchange.h"
+#include "qpid/broker/TopicExchange.h"
 
 using namespace qpid::broker;
 using namespace qpid::concurrent;
+using std::pair;
 
-ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
-
-ExchangeRegistry::~ExchangeRegistry(){
-    for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); 
++i)
-    {
-        delete i->second;
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, 
const string& type) throw(UnknownExchangeTypeException){
+    Locker locker(lock);
+    ExchangeMap::iterator i =  exchanges.find(name);
+    if (i == exchanges.end()) {
+       Exchange::shared_ptr exchange;
+
+        if(type == TopicExchange::typeName){
+            exchange = Exchange::shared_ptr(new TopicExchange(name));
+        }else if(type == DirectExchange::typeName){
+            exchange = Exchange::shared_ptr(new DirectExchange(name));
+        }else if(type == FanOutExchange::typeName){
+            exchange = Exchange::shared_ptr(new FanOutExchange(name));
+        }else if (type == HeadersExchange::typeName) {
+            exchange = Exchange::shared_ptr(new HeadersExchange(name));
+        }else{
+            throw UnknownExchangeTypeException();    
+        }
+       exchanges[name] = exchange;
+       return std::pair<Exchange::shared_ptr, bool>(exchange, true);
+    } else {
+       return std::pair<Exchange::shared_ptr, bool>(i->second, false);
     }
-    delete lock;
-}
-
-void ExchangeRegistry::declare(Exchange* exchange){
-    exchanges[exchange->getName()] = exchange;
 }
 
 void ExchangeRegistry::destroy(const string& name){
-    if(exchanges[name]){
-        delete exchanges[name];
-        exchanges.erase(name);
-    }
+    Locker locker(lock);
+    exchanges.erase(name);
 }
 
-Exchange* ExchangeRegistry::get(const string& name){
+Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+    Locker locker(lock);
     return exchanges[name];
 }
 
@@ -51,7 +64,7 @@
 const std::string empty;
 }
 
-Exchange* ExchangeRegistry::getDefault()
+Exchange::shared_ptr ExchangeRegistry::getDefault()
 {
     return get(empty);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Tue Oct 31 
10:33:40 2006
@@ -20,22 +20,21 @@
 
 #include <map>
 #include "qpid/broker/Exchange.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/MonitorImpl.h"
 
 namespace qpid {
 namespace broker {
+    struct UnknownExchangeTypeException{};
+
     class ExchangeRegistry{
-        typedef std::map<string, Exchange*> ExchangeMap;
+        typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
         ExchangeMap exchanges;
-        qpid::concurrent::Monitor* lock;
+        qpid::concurrent::MonitorImpl lock;
     public:
-        ExchangeRegistry();
-        void declare(Exchange* exchange);
+        std::pair<Exchange::shared_ptr, bool> declare(const string& name, 
const string& type) throw(UnknownExchangeTypeException);
         void destroy(const string& name);
-        Exchange* get(const string& name);
-        Exchange* getDefault();
-        inline qpid::concurrent::Monitor* getLock(){ return lock; }
-        ~ExchangeRegistry();
+        Exchange::shared_ptr get(const string& name);
+        Exchange::shared_ptr getDefault();
     };
 }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Tue Oct 31 
10:33:40 2006
@@ -37,6 +37,8 @@
     static const std::string typeName;
         
     FanOutExchange(const std::string& name);
+
+    virtual std::string getType(){ return typeName; }            
         
     virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, 
qpid::framing::FieldTable* args);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Tue Oct 31 
10:33:40 2006
@@ -40,6 +40,8 @@
     static const std::string typeName;
 
     HeadersExchange(const string& name);
+    
+    virtual std::string getType(){ return typeName; }            
         
     virtual void bind(Queue::shared_ptr queue, const string& routingKey, 
qpid::framing::FieldTable* args);
 

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp 
Tue Oct 31 10:33:40 2006
@@ -33,11 +33,11 @@
 }
 
 SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : 
timeout(_timeout), cleaner(&queues, timeout/10){
-    exchanges.declare(new DirectExchange(empty)); // Default exchange.
-    exchanges.declare(new DirectExchange(amq_direct));
-    exchanges.declare(new TopicExchange(amq_topic));
-    exchanges.declare(new FanOutExchange(amq_fanout));
-    exchanges.declare(new HeadersExchange(amq_match));
+    exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+    exchanges.declare(amq_direct, DirectExchange::typeName);
+    exchanges.declare(amq_topic, TopicExchange::typeName);
+    exchanges.declare(amq_fanout, FanOutExchange::typeName);
+    exchanges.declare(amq_match, HeadersExchange::typeName);
     cleaner.start();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp Tue 
Oct 31 10:33:40 2006
@@ -73,11 +73,8 @@
 }
 
 
-Exchange* SessionHandlerImpl::findExchange(const string& name){
-    exchanges->getLock()->acquire();
-    Exchange* exchange(exchanges->get(name));
-    exchanges->getLock()->release();
-    return exchange;
+Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
+    return exchanges->get(name);
 }
 
 void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
@@ -217,40 +214,31 @@
                                                       bool passive, bool 
/*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
                                                       FieldTable& 
/*arguments*/){
 
-    if(!passive && (
-           type != TopicExchange::typeName &&
-           type != DirectExchange::typeName &&
-           type != FanOutExchange::typeName &&
-           type != HeadersExchange::typeName
-       )
-        )
-    {
-        throw ChannelException(540, "Exchange type not implemented: " + type);
-    }
-    
-    parent->exchanges->getLock()->acquire();
-    if(!parent->exchanges->get(exchange)){
-        if(type == TopicExchange::typeName){
-            parent->exchanges->declare(new TopicExchange(exchange));
-        }else if(type == DirectExchange::typeName){
-            parent->exchanges->declare(new DirectExchange(exchange));
-        }else if(type == FanOutExchange::typeName){
-            parent->exchanges->declare(new DirectExchange(exchange));
-        }else if (type == HeadersExchange::typeName) {
-            parent->exchanges->declare(new HeadersExchange(exchange));
+    if(passive){
+        if(!parent->exchanges->get(exchange)){
+            throw ChannelException(404, "Exchange not found: " + exchange);    
        
+        }
+    }else{        
+        try{
+            std::pair<Exchange::shared_ptr, bool> response = 
parent->exchanges->declare(exchange, type);
+            if(!response.second && response.first->getType() != type){
+                throw ConnectionException(507, "Exchange already declared to 
be of type " 
+                                          + response.first->getType() + ", 
requested " + type);
+            }
+        }catch(UnknownExchangeTypeException& e){
+            throw ConnectionException(503, "Exchange type not implemented: " + 
type);
         }
     }
-    parent->exchanges->getLock()->release();
+    
     if(!nowait){
         parent->client.getExchange().declareOk(channel);
     }
 } 
         
-void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, 
u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, 
u_int16_t /*ticket*/, 
+                                                      string& exchange, bool 
/*ifUnused*/, bool nowait){
     //TODO: implement unused
-    parent->exchanges->getLock()->acquire();
     parent->exchanges->destroy(exchange);
-    parent->exchanges->getLock()->release();
     if(!nowait) parent->client.getExchange().deleteOk(channel);
 } 
 
@@ -290,7 +278,7 @@
                                                 FieldTable& arguments){
 
     Queue::shared_ptr queue = parent->getQueue(queueName, channel);
-    Exchange* exchange = parent->exchanges->get(exchangeName);
+    Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
     if(exchange){
         if(routingKey.empty() && queueName.empty()) routingKey = 
queue->getName();
         exchange->bind(queue, routingKey, &arguments);
@@ -371,7 +359,7 @@
                                                    string& exchangeName, 
string& routingKey, 
                                                    bool mandatory, bool 
immediate){
 
-    Exchange* exchange = exchangeName.empty() ? 
parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+    Exchange::shared_ptr exchange = exchangeName.empty() ? 
parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
     if(exchange){
         Message* msg = new Message(parent, exchangeName, routingKey, 
mandatory, immediate);
         parent->getChannel(channel)->handlePublish(msg, exchange);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h Tue Oct 
31 10:33:40 2006
@@ -97,7 +97,7 @@
      */
     Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
 
-    Exchange* findExchange(const string& name);
+    Exchange::shared_ptr findExchange(const string& name);
     
   public:
     SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* 
queues, 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Tue Oct 31 
10:33:40 2006
@@ -77,6 +77,8 @@
     static const std::string typeName;
 
     TopicExchange(const string& name);
+
+    virtual std::string getType(){ return typeName; }            
         
     virtual void bind(Queue::shared_ptr queue, const string& routingKey, 
qpid::framing::FieldTable* args);
 

Modified: incubator/qpid/trunk/qpid/python/tests/exchange.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/exchange.py?view=diff&rev=469599&r1=469598&r2=469599
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/exchange.py Tue Oct 31 10:33:40 2006
@@ -23,6 +23,7 @@
 import Queue, logging
 from qpid.testlib import TestBase
 from qpid.content import Content
+from qpid.client import Closed
 
 
 class StandardExchangeVerifier:
@@ -207,10 +208,14 @@
 class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
     """
     If set, and the exchange does not already exist, the server MUST raise a
-    channel exception with reply code 404 (not found).
-    
-    
+    channel exception with reply code 404 (not found).    
     """
+    def test(self):
+        try:
+            self.channel.exchange_declare(exchange="humpty_dumpty", 
passive=True)
+            self.fail("Expected 404 for passive declaration of unknown 
exchange.")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
 
 
 class DeclareMethodDurableFieldSupportRuleTests(TestBase):
@@ -292,3 +297,28 @@
         self.myBasicPublish({"irrelevant":0})
         self.assertEmpty(self.q)
 
+
+class MiscellaneousErrorsTests(TestBase):
+    """
+    Test some miscellaneous error conditions
+    """
+    def testTypeNotKnown(self):
+        try:
+            
self.channel.exchange_declare(exchange="test_type_not_known_exchange", 
type="invalid_type")
+            self.fail("Expected 503 for declaration of unknown exchange type.")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def testDifferentDeclaredType(self):
+        
self.channel.exchange_declare(exchange="test_different_declared_type_exchange", 
type="direct")
+        try:
+            
self.channel.exchange_declare(exchange="test_different_declared_type_exchange", 
type="topic")
+            self.fail("Expected 507 for redeclaration of exchange with 
different type.")
+        except Closed, e:
+            self.assertConnectionException(507, e.args[0])
+        #cleanup    
+        other = self.connect()
+        c2 = other.channel(1)
+        c2.channel_open()
+        c2.exchange_delete(exchange="test_different_declared_type_exchange")
+    


Reply via email to