Author: gsim
Date: Wed Dec  6 04:01:40 2006
New Revision: 483046

URL: http://svn.apache.org/viewvc?view=rev&rev=483046
Log:
Added new configuration option for staging threshold (size above which messages 
will be written to disk as content arrives rather than accumulating that 
content 
in memory). Pass this through to all channels and to the store on recovery.


Modified:
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp Wed Dec  6 
04:01:40 2006
@@ -31,7 +31,7 @@
 using namespace qpid::sys;
 
 
-Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
+Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, 
MessageStore* const _store, u_int64_t _stagingThreshold) :
     id(_id), 
     out(_out), 
     currentDeliveryTag(1),
@@ -40,8 +40,8 @@
     prefetchCount(0),
     framesize(_framesize),
     tagGenerator("sgen"),
-    store(0),
-    messageBuilder(this){
+    store(_store),
+    messageBuilder(this, _store, _stagingThreshold){
 
     outstanding.reset();
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.h Wed Dec  6 
04:01:40 2006
@@ -36,7 +36,7 @@
 #include <NameGenerator.h>
 #include <Prefetch.h>
 #include <BrokerQueue.h>
-#include <TransactionalStore.h>
+#include <MessageStore.h>
 #include <TxAck.h>
 #include <TxBuffer.h>
 #include <TxPublish.h>
@@ -85,7 +85,7 @@
             qpid::sys::Mutex deliveryLock;
             TxBuffer txBuffer;
             AccumulatedAck accumulatedAck;
-            TransactionalStore* store;
+            MessageStore* const store;
             MessageBuilder messageBuilder;//builder for in-progress message
             Exchange::shared_ptr exchange;//exchange to which any in-progress 
message was published to
 
@@ -95,7 +95,8 @@
             bool checkPrefetch(Message::shared_ptr& msg);
         
         public:
-            Channel(qpid::framing::OutputHandler* out, int id, u_int32_t 
framesize);
+            Channel(qpid::framing::OutputHandler* out, int id, u_int32_t 
framesize, 
+                    MessageStore* const _store = 0, u_int64_t stagingThreshold 
= 0);
             ~Channel();
             inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue 
= queue; }
             inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.cpp Wed Dec  6 
04:01:40 2006
@@ -32,6 +32,7 @@
     maxConnections("max-connections", "Set the maximum number of connections 
the broker can accept (default=500).", 500),
     connectionBacklog("connection-backlog", "Set the connection backlog for 
the servers socket (default=10)", 10),
     store('s', "store", "Set the message store module to use (default='' which 
implies no store)", ""),
+    stagingThreshold("staging-threshold", "Set the message size threshold 
above which messages will be written to disk as they arrive 
(default=5,000,000)", 5000000),
     help("help", "Print usage information", false),
     version("version", "Print version information", false)
 {
@@ -41,6 +42,7 @@
     options.push_back(&maxConnections);
     options.push_back(&connectionBacklog);
     options.push_back(&store);
+    options.push_back(&stagingThreshold);
     options.push_back(&help);
     options.push_back(&version);
 }
@@ -106,6 +108,11 @@
     return store.getValue();
 }
 
+long Configuration::getStagingThreshold() const {
+    return stagingThreshold.getValue();
+}
+
+
 Configuration::Option::Option(const char _flag, const string& _name, const 
string& _desc) :
     flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
 
@@ -190,6 +197,28 @@
 
 void Configuration::IntOption::setValue(const std::string& _value){
     value = atoi(_value.c_str());
+}
+
+// Long Option:
+
+Configuration::LongOption::LongOption(const char _flag, const string& _name, 
const string& _desc, const long _value) :
+    Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::LongOption::LongOption(const string& _name, const string& 
_desc, const long _value) :
+    Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::LongOption::~LongOption(){}
+
+long Configuration::LongOption::getValue() const {
+    return value;
+}
+
+bool Configuration::LongOption::needsValue() const {
+    return true;
+}
+
+void Configuration::LongOption::setValue(const std::string& _value){
+    value = atol(_value.c_str());
 }
 
 // Bool Option:

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Configuration.h Wed Dec  6 
04:01:40 2006
@@ -63,6 +63,20 @@
                 virtual void setValue(int _value) { value = _value; }
             };
 
+            class LongOption : public Option{
+                const long defaultValue;
+                int value;
+            public:
+                LongOption(char flag, const std::string& name, const 
std::string& desc, const long value = 0);
+                LongOption(const std::string& name, const std::string& desc, 
const long value = 0);
+                virtual ~LongOption();
+
+                long getValue() const;
+                virtual bool needsValue() const;
+                virtual void setValue(const std::string& value);
+                virtual void setValue(int _value) { value = _value; }
+            };
+
             class StringOption : public Option{
                 const std::string defaultValue;
                 std::string value;
@@ -96,6 +110,7 @@
             IntOption maxConnections;
             IntOption connectionBacklog;
             StringOption store;
+            LongOption stagingThreshold;
             BoolOption help;
             BoolOption version;
             char const *programName;
@@ -123,6 +138,7 @@
             int getMaxConnections() const;
             int getConnectionBacklog() const;
             const std::string& getStore() const;
+            long getStagingThreshold() const;
 
             void setHelp(bool b) { help.setValue(b); }
             void setVersion(bool b) { version.setValue(b); }
@@ -132,6 +148,7 @@
             void setMaxConnections(int i) { maxConnections.setValue(i); }
             void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
             void setStore(const std::string& s) { store.setValue(s); }
+            void setStagingThreshold(long l) { stagingThreshold.setValue(l); }
 
             void usage();
         };

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp Wed Dec  6 
04:01:40 2006
@@ -73,3 +73,7 @@
     } while(queues.find(name) != queues.end());
     return name;
 }
+
+MessageStore* const QueueRegistry::getStore() const {
+    return store;
+}

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h Wed Dec  6 
04:01:40 2006
@@ -74,6 +74,12 @@
      */
     string generateName();
 
+    /**
+     * Return the message store used.
+     */
+    MessageStore* const getStore() const;
+
+
   private:
     typedef std::map<string, Queue::shared_ptr> QueueMap;
     QueueMap queues;

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp Wed 
Dec  6 04:01:40 2006
@@ -39,9 +39,9 @@
 const std::string amq_match("amq.match");
 }
 
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& 
_store, u_int32_t _timeout) : 
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& 
_store, u_int64_t _stagingThreshold, u_int32_t _timeout) : 
     store(_store.empty() ? (MessageStore*)  new NullMessageStore() : 
(MessageStore*) new MessageStoreModule(_store)), 
-    queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+    queues(store.get()), settings(_timeout, _stagingThreshold), 
cleaner(&queues, _timeout/10)
 {
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     exchanges.declare(amq_direct, DirectExchange::typeName);
@@ -51,7 +51,8 @@
 
     if(store.get()) {
         RecoveryManager recoverer(queues, exchanges);
-        store->recover(recoverer);
+        MessageStoreSettings storeSettings = { settings.stagingThreshold };
+        store->recover(recoverer, &storeSettings);
     }
 
     cleaner.start();
@@ -59,7 +60,7 @@
 
 SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
 {
-    return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, 
timeout);
+    return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, 
settings);
 }
 
 SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h Wed 
Dec  6 04:01:40 2006
@@ -31,6 +31,7 @@
 #include <sys/SessionHandler.h>
 #include <sys/SessionHandlerFactory.h>
 #include <sys/TimeoutHandler.h>
+#include <SessionHandlerImpl.h>
 #include <memory>
 
 namespace qpid {
@@ -41,10 +42,10 @@
             std::auto_ptr<MessageStore> store;
             QueueRegistry queues;
             ExchangeRegistry exchanges;
-            const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+            const Settings settings;
             AutoDelete cleaner;
         public:
-            SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t 
timeout = 30000);
+            SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t 
stagingThreshold = 0, u_int32_t timeout = 30000);
             virtual qpid::sys::SessionHandler* 
create(qpid::sys::SessionContext* ctxt);
             virtual ~SessionHandlerFactoryImpl();
         };

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp Wed Dec  6 
04:01:40 2006
@@ -35,7 +35,7 @@
                                        QueueRegistry* _queues, 
                                        ExchangeRegistry* _exchanges, 
                                        AutoDelete* _cleaner,
-                                       const u_int32_t _timeout) :
+                                       const Settings& _settings) :
     context(_context), 
 // AMQP version management change - kpvdr 2006-11-17
 // TODO: Make this class version-aware and link these hard-wired numbers to 
that version
@@ -43,7 +43,7 @@
     queues(_queues), 
     exchanges(_exchanges),
     cleaner(_cleaner),
-    timeout(_timeout),
+    settings(_settings),
     basicHandler(new BasicHandlerImpl(this)),
     channelHandler(new ChannelHandlerImpl(this)),
     connectionHandler(new ConnectionHandlerImpl(this)),
@@ -200,7 +200,8 @@
 
 
 void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const 
string& /*outOfBand*/){
-    parent->channels[channel] = new Channel(parent->context, channel, 
parent->framemax);
+    parent->channels[channel] = new Channel(parent->context, channel, 
parent->framemax, 
+                                            parent->queues->getStore(), 
parent->settings.stagingThreshold);
     parent->client.getChannel().openOk(channel);
 } 
         
@@ -262,7 +263,7 @@
        queue = parent->getQueue(name, channel);
     } else {
        std::pair<Queue::shared_ptr, bool> queue_created =  
-            parent->queues->declare(name, durable, autoDelete ? 
parent->timeout : 0, exclusive ? parent : 0);
+            parent->queues->declare(name, durable, autoDelete ? 
parent->settings.timeout : 0, exclusive ? parent : 0);
        queue = queue_created.first;
        assert(queue);
        if (queue_created.second) { // This is a new queue

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h Wed Dec  6 
04:01:40 2006
@@ -60,6 +60,14 @@
     const char* what() const throw() { return text.c_str(); }
 };
 
+class Settings {
+public:
+    const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+    const u_int64_t stagingThreshold;
+
+    Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : 
timeout(_timeout), stagingThreshold(_stagingThreshold) {}
+};
+
 class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, 
                            public virtual 
qpid::framing::AMQP_ServerOperations, 
                            public virtual ConnectionToken
@@ -72,7 +80,7 @@
     QueueRegistry* queues;
     ExchangeRegistry* const exchanges;
     AutoDelete* const cleaner;
-    const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+    const Settings settings;
 
     std::auto_ptr<BasicHandler> basicHandler;
     std::auto_ptr<ChannelHandler> channelHandler;
@@ -104,7 +112,7 @@
     
   public:
     SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* 
queues, 
-                       ExchangeRegistry* exchanges, AutoDelete* cleaner, const 
u_int32_t timeout);
+                       ExchangeRegistry* exchanges, AutoDelete* cleaner, const 
Settings& settings);
     virtual void received(qpid::framing::AMQFrame* frame);
     virtual void initiated(qpid::framing::ProtocolInitiation* header);
     virtual void idleOut();

Modified: incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp?view=diff&rev=483046&r1=483045&r2=483046
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/ConfigurationTest.cpp Wed Dec  6 
04:01:40 2006
@@ -32,6 +32,7 @@
     CPPUNIT_TEST(testPortLongForm);
     CPPUNIT_TEST(testPortShortForm);
     CPPUNIT_TEST(testStore);
+    CPPUNIT_TEST(testStagingThreshold);
     CPPUNIT_TEST(testVarious);
     CPPUNIT_TEST_SUITE_END();
 
@@ -68,6 +69,15 @@
         conf.parse("ignore", 3, argv);
         std::string expected("my-store-module.so");
         CPPUNIT_ASSERT_EQUAL(expected, conf.getStore());
+    }
+
+    void testStagingThreshold() 
+    {
+        Configuration conf;
+        char* argv[] = {"ignore", "--staging-threshold", "123456789"};
+        conf.parse("ignore", 3, argv);
+        long expected = 123456789;
+        CPPUNIT_ASSERT_EQUAL(expected, conf.getStagingThreshold());
     }
 
     void testVarious() 


Reply via email to