Author: gsim
Date: Thu Nov  8 06:05:38 2007
New Revision: 593159

URL: http://svn.apache.org/viewvc?rev=593159&view=rev
Log:
Make standard exchanges durable
Ensure flags are set correctly for recovered messages with no content


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Nov  8 
06:05:38 2007
@@ -128,10 +128,6 @@
     }
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
-    exchanges.declare(amq_direct, DirectExchange::typeName);
-    exchanges.declare(amq_topic, TopicExchange::typeName);
-    exchanges.declare(amq_fanout, FanOutExchange::typeName);
-    exchanges.declare(amq_match, HeadersExchange::typeName);
     
     if(conf.enableMgmt) {
         QPID_LOG(info, "Management enabled");
@@ -153,6 +149,11 @@
              store->recover(recoverer);
         }
     }
+    //ensure standard exchanges exist (done after recovery from store)
+    declareStandardExchange(amq_direct, DirectExchange::typeName);
+    declareStandardExchange(amq_topic, TopicExchange::typeName);
+    declareStandardExchange(amq_fanout, FanOutExchange::typeName);
+    declareStandardExchange(amq_match, HeadersExchange::typeName);
 
     // Initialize plugins
     const Plugin::Plugins& plugins=Plugin::getPlugins();
@@ -160,6 +161,15 @@
          i != plugins.end();
          i++)
         (*i)->initialize(*this);
+}
+
+void Broker::declareStandardExchange(const std::string& name, const 
std::string& type)
+{
+    bool storeEnabled = store.get();
+    std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, 
type, storeEnabled);
+    if (status.second && storeEnabled) {
+        store->create(*status.first);
+    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Nov  8 06:05:38 
2007
@@ -136,6 +136,7 @@
     ManagementObjectVhost::shared_ptr  mgmtVhostObject;
 
     static MessageStore* createStore(const Options& config);
+    void declareStandardExchange(const std::string& name, const std::string& 
type);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu Nov  8 
06:05:38 2007
@@ -40,7 +40,7 @@
         std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, 
const std::string& type)
             throw(UnknownExchangeTypeException);
         std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, 
const std::string& type, 
-                                                      bool durable, const 
qpid::framing::FieldTable& args)
+                                                      bool durable, const 
qpid::framing::FieldTable& args = framing::FieldTable())
             throw(UnknownExchangeTypeException);
         void destroy(const std::string& name);
         Exchange::shared_ptr get(const std::string& name);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Nov  8 
06:05:38 2007
@@ -36,7 +36,7 @@
 TransferAdapter Message::TRANSFER;
 PublishAdapter Message::PUBLISH;
 
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), 
redelivered(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), 
redelivered(false), loaded(false), publisher(0), adapter(0) {}
 
 std::string Message::getRoutingKey() const
 {
@@ -121,12 +121,20 @@
 
 void Message::decodeContent(framing::Buffer& buffer)
 {
-    //get the data as a string and set that as the content
-    //body on a frame then add that frame to the frameset
-    AMQFrame frame;
-    frame.setBody(AMQContentBody());
-    frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
-    frames.append(frame);
+    if (buffer.available()) {
+        //get the data as a string and set that as the content
+        //body on a frame then add that frame to the frameset
+        AMQFrame frame;
+        frame.setBody(AMQContentBody());
+        frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+        frames.append(frame);
+    } else {
+        //adjust header flags
+        MarkLastSegment f;
+        frames.map_if(f, TypeFilter(HEADER_BODY));    
+    }
+    //mark content loaded
+    loaded = true;
 }
 
 void Message::releaseContent(MessageStore* _store)
@@ -205,5 +213,5 @@
 
 bool Message::isContentLoaded() const
 {
-    return contentSize() > 0;
+    return loaded;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Nov  8 06:05:38 
2007
@@ -124,6 +124,7 @@
     mutable boost::shared_ptr<Exchange> exchange;
     mutable uint64_t persistenceId;
     bool redelivered;
+    bool loaded;
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu Nov  
8 06:05:38 2007
@@ -53,18 +53,18 @@
 
 void NullMessageStore::create(PersistableQueue& queue)
 {
-    QPID_LOG(info, "Can't create durable queue '" << queue.getName() << "'. 
Persistence not enabled.");
+    QPID_LOG(info, "Queue '" << queue.getName() 
+             << "' will not be durable. Persistence not enabled.");
 }
 
-void NullMessageStore::destroy(PersistableQueue& queue)
+void NullMessageStore::destroy(PersistableQueue&)
 {
-    QPID_LOG(info, "Can't destroy durable queue '" << queue.getName() << "'. 
Persistence not enabled.");
 }
 
 void NullMessageStore::create(const PersistableExchange& exchange)
 {
-    QPID_LOG(info, "Can't create durable exchange '"
-             << exchange.getName() << "'. Persistence not enabled.");
+    QPID_LOG(info, "Exchange'" << exchange.getName() 
+             << "' will not be durable. Persistence not enabled.");
 }
 
 void NullMessageStore::destroy(const PersistableExchange& )
@@ -86,12 +86,11 @@
 
 void NullMessageStore::destroy(PersistableMessage&)
 {
-    QPID_LOG(info, "No need to destroy staged message. Persistence not 
enabled.");
 }
 
 void NullMessageStore::appendContent(const PersistableMessage&, const string&)
 {
-    QPID_LOG(info, "Can't load content. Persistence not enabled.");
+    QPID_LOG(info, "Can't append content. Persistence not enabled.");
 }
 
 void NullMessageStore::loadContent(const PersistableMessage&, string&, 
uint64_t, uint32_t)
@@ -102,18 +101,16 @@
 void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, 
const PersistableQueue& queue)
 {
     msg.enqueueComplete(); 
-    QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'. 
Persistence not enabled.");
+    QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() 
<< "'. Persistence not enabled.");
 }
 
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, 
const PersistableQueue& queue)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, 
const PersistableQueue&)
 {
     msg.dequeueComplete();
-    QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. 
Persistence not enabled.");
 }
 
-void NullMessageStore::flush(const qpid::broker::PersistableQueue& queue)
+void NullMessageStore::flush(const qpid::broker::PersistableQueue&)
 {
-    QPID_LOG(info, "Can't flush. Persistence not enabled queue-" << 
queue.getName());
 }
 
 u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu Nov  8 
06:05:38 2007
@@ -38,7 +38,7 @@
 public:
     NullMessageStore(bool warn = false);
 
-       virtual bool init(const std::string& dir, const bool async, const bool 
force);
+    virtual bool init(const std::string& dir, const bool async, const bool 
force);
     virtual std::auto_ptr<TransactionContext> begin();
     virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     virtual void prepare(TPCTransactionContext& txn);
@@ -63,7 +63,7 @@
     virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue);
     virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue);
     virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
-       virtual void flush(const qpid::broker::PersistableQueue& queue);
+    virtual void flush(const qpid::broker::PersistableQueue& queue);
     ~NullMessageStore(){}
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=593159&r1=593158&r2=593159&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Thu Nov  8 
06:05:38 2007
@@ -111,6 +111,12 @@
     }
 };
 
+class MarkLastSegment
+{
+public:
+    void operator()(AMQFrame& f) const { f.setEof(true); }
+};
+
 }
 }
 


Reply via email to