Author: gsim
Date: Wed Nov 15 02:28:11 2006
New Revision: 475181

URL: http://svn.apache.org/viewvc?view=rev&rev=475181
Log:
Added ability for broker to load a message store implementation from a library.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp   
(with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Module.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=475181&r1=475180&r2=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Nov 15 
02:28:11 2006
@@ -29,7 +29,8 @@
 Broker::Broker(const Configuration& config) :
     acceptor(Acceptor::create(config.getPort(),
                               config.getConnectionBacklog(),
-                              config.getWorkerThreads()))
+                              config.getWorkerThreads())),
+    factory(config.getStore())
 { }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp?view=diff&rev=475181&r1=475180&r2=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp Wed Nov 15 
02:28:11 2006
@@ -30,6 +30,7 @@
     workerThreads("worker-threads", "Sets the number of worker threads to use 
(default=5).", 5),
     maxConnections("max-connections", "Sets the maximum number of connections 
the broker can accept (default=500).", 500),
     connectionBacklog("connection-backlog", "Sets the connection backlog for 
the servers socket (default=10)", 10),
+    store('s', "store", "Sets the message store module to use (default='' 
which implies no store)", ""),
     help("help", "Prints usage information", false)
 {
     options.push_back(&trace);
@@ -37,6 +38,7 @@
     options.push_back(&workerThreads);
     options.push_back(&maxConnections);
     options.push_back(&connectionBacklog);
+    options.push_back(&store);
     options.push_back(&help);
 }
 
@@ -84,6 +86,10 @@
 
 int Configuration::getConnectionBacklog() const {
     return connectionBacklog.getValue();
+}
+
+const std::string& Configuration::getStore() const {
+    return store.getValue();
 }
 
 Configuration::Option::Option(const char _flag, const string& _name, const 
string& _desc) : 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h?view=diff&rev=475181&r1=475180&r2=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h Wed Nov 15 
02:28:11 2006
@@ -95,6 +95,7 @@
             IntOption workerThreads;
             IntOption maxConnections;
             IntOption connectionBacklog;
+            StringOption store;
             BoolOption help;
 
             typedef std::vector<Option*>::iterator op_iterator;
@@ -118,6 +119,7 @@
             int getWorkerThreads() const;
             int getMaxConnections() const;
             int getConnectionBacklog() const;
+            const std::string& getStore() const;
 
             void setHelp(bool b) { help.setValue(b); }
             void setTrace(bool b) { trace.setValue(b); }
@@ -125,6 +127,7 @@
             void setWorkerThreads(int i) { workerThreads.setValue(i); }
             void setMaxConnections(int i) { maxConnections.setValue(i); }
             void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
+            void setStore(const std::string& s) { store.setValue(s); }
 
             void usage();
         };

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=auto&rev=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Wed 
Nov 15 02:28:11 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/MessageStoreModule.h>
+#include <iostream>
+
+using namespace qpid::broker;
+
+MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
+{
+}
+
+void MessageStoreModule::create(const Queue& queue)
+{
+    store->create(queue);
+}
+
+void MessageStoreModule::destroy(const Queue& queue)
+{
+    store->destroy(queue);
+}
+
+void MessageStoreModule::recover(QueueRegistry& registry)
+{
+    store->recover(registry);
+}
+
+void MessageStoreModule::enqueue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+{
+    store->enqueue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::dequeue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+{
+    store->dequeue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::committed(const string * const xid)
+{
+    store->committed(xid);
+}
+
+void MessageStoreModule::aborted(const string * const xid)
+{
+    store->aborted(xid);
+}
+
+std::auto_ptr<TransactionContext> MessageStoreModule::begin()
+{
+    return store->begin();
+}
+
+void MessageStoreModule::commit(TransactionContext* ctxt)
+{
+    store->commit(ctxt);
+}
+
+void MessageStoreModule::abort(TransactionContext* ctxt)
+{
+    store->abort(ctxt);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=auto&rev=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Wed Nov 
15 02:28:11 2006
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStoreModule_
+#define _MessageStoreModule_
+
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageStore.h>
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueRegistry.h>
+#include <qpid/sys/Module.h>
+
+namespace qpid {
+    namespace broker {
+        /**
+         * A null implementation of the MessageStore interface
+         */
+        class MessageStoreModule : public MessageStore{
+            qpid::sys::Module<MessageStore> store;
+        public:
+            MessageStoreModule(const std::string& name);
+            void create(const Queue& queue);
+            void destroy(const Queue& queue);
+            void recover(QueueRegistry& queues);
+            void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
+            void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
+            void committed(const string * const xid);
+            void aborted(const string * const xid);
+            std::auto_ptr<TransactionContext> begin();
+            void commit(TransactionContext* ctxt);
+            void abort(TransactionContext* ctxt);
+            ~MessageStoreModule(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=475181&r1=475180&r2=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp 
Wed Nov 15 02:28:11 2006
@@ -23,6 +23,7 @@
 #include <qpid/broker/DirectExchange.h>
 #include <qpid/broker/FanOutExchange.h>
 #include <qpid/broker/HeadersExchange.h>
+#include <qpid/broker/MessageStoreModule.h>
 #include <qpid/broker/NullMessageStore.h>
 #include <qpid/broker/SessionHandlerImpl.h>
 
@@ -38,8 +39,9 @@
 const std::string amq_match("amq.match");
 }
 
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : 
-    store(new NullMessageStore()), queues(store.get()), timeout(_timeout), 
cleaner(&queues, timeout/10)
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& 
_store, u_int32_t _timeout) : 
+    store(_store.empty() ? (MessageStore*)  new NullMessageStore() : 
(MessageStore*) new MessageStoreModule(_store)), 
+    queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
 {
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     exchanges.declare(amq_direct, DirectExchange::typeName);

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h?view=diff&rev=475181&r1=475180&r2=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h 
Wed Nov 15 02:28:11 2006
@@ -44,7 +44,7 @@
             const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
             AutoDelete cleaner;
         public:
-            SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+            SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t 
timeout = 30000);
             void recover();
             virtual qpid::sys::SessionHandler* 
create(qpid::sys::SessionContext* ctxt);
             virtual ~SessionHandlerFactoryImpl();

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Module.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Module.h?view=auto&rev=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Module.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Module.h Wed Nov 15 02:28:11 2006
@@ -0,0 +1,161 @@
+#ifndef _sys_Module_h
+#define _sys_Module_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <iostream>
+#include <qpid/QpidError.h>
+
+namespace qpid {
+namespace sys {
+#if USE_APR
+#include <apr-1/apr_dso.h>
+    typedef apr_dso_handle_t* dso_handle_t;
+#else 
+    typedef void* dso_handle_t;
+#endif
+
+    template <class T> class Module : private boost::noncopyable
+    {
+        typedef T* create_t();
+        typedef void destroy_t(T*);
+        
+        dso_handle_t handle;
+        destroy_t* destroy;
+        T* ptr;
+
+        void load(const std::string& name);
+        void unload();
+        void* getSymbol(const std::string& name);
+
+    public:
+        Module(const std::string& name);
+        T* operator->(); 
+        T* get(); 
+        ~Module() throw();
+    };
+
+}
+}
+
+using namespace qpid::sys;
+
+template <class T> Module<T>::Module(const std::string& module) : destroy(0), 
ptr(0) 
+{
+    load(module);
+    //TODO: need a better strategy for symbol names to allow multiple
+    //modules to be loaded without clashes...
+
+    //Note: need the double cast to avoid errors in casting from void* to 
function pointer with -pedantic
+    create_t* create = 
reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create")));
+    destroy = 
reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy")));
+    ptr = create();
+}
+
+template <class T> T* Module<T>::operator->() 
+{ 
+    return ptr; 
+}
+
+template <class T> T* Module<T>::get() 
+{ 
+    return ptr; 
+}
+
+template <class T> Module<T>::~Module() throw()
+{
+    try {
+        if (handle && ptr) {
+            destroy(ptr);
+        }
+        if (handle) unload();
+    } catch (std::exception& e) {
+        std::cout << "Error while destroying module: " << e.what() << 
std::endl;
+    }
+    destroy = 0;
+    handle = 0;
+    ptr = 0;
+}
+
+// APR ================================================================
+#if USE_APR
+
+#include <qpid/apr/APRBase.h>
+#include <qpid/apr/APRPool.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+    CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get()));
+}
+
+template <class T> void Module<T>::unload()
+{
+    CHECK_APR_SUCCESS(apr_dso_unload(handle));
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+    apr_dso_handle_sym_t symbol;
+    CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str()));
+    return (void*) symbol;
+}
+
+// POSIX================================================================
+#else 
+
+#include <dlfcn.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+    dlerror();
+    handle = dlopen(name.c_str(), RTLD_NOW);
+    const char* error = dlerror();
+    if (error) {
+        THROW_QPID_ERROR(INTERNAL_ERROR, error);
+    }
+}
+
+template <class T> void Module<T>::unload()
+{
+    dlerror();
+    dlclose(handle);
+    const char* error = dlerror();
+    if (error) {
+        THROW_QPID_ERROR(INTERNAL_ERROR, error);
+    }
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+    dlerror();
+    void* sym = dlsym(handle, name.c_str());
+    const char* error = dlerror();
+    if (error) {
+        THROW_QPID_ERROR(INTERNAL_ERROR, error);
+    }
+    return sym;
+}
+
+#endif //if USE_APR
+
+#endif //ifndef _sys_Module_h
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Module.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp?view=diff&rev=475181&r1=475180&r2=475181
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp 
Wed Nov 15 02:28:11 2006
@@ -31,6 +31,7 @@
     CPPUNIT_TEST(testIsHelp);
     CPPUNIT_TEST(testPortLongForm);
     CPPUNIT_TEST(testPortShortForm);
+    CPPUNIT_TEST(testStore);
     CPPUNIT_TEST(testVarious);
     CPPUNIT_TEST_SUITE_END();
 
@@ -60,8 +61,17 @@
         CPPUNIT_ASSERT_EQUAL(6789, conf.getPort());
     }
 
-    void testVarious() 
+    void testStore() 
     {
+        Configuration conf;
+        char* argv[] = {"ignore", "--store", "my-store-module.so"};
+        conf.parse(3, argv);
+        std::string expected("my-store-module.so");
+        CPPUNIT_ASSERT_EQUAL(expected, conf.getStore());
+    }
+
+    void testVarious() 
+    {        
         Configuration conf;
         char* argv[] = {"ignore", "-t", "--worker-threads", "10"};
         conf.parse(4, argv);


Reply via email to