Author: cctrieloff
Date: Thu Oct 25 19:37:54 2007
New Revision: 588478

URL: http://svn.apache.org/viewvc?rev=588478&view=rev
Log:
- added patch from Tedd
- QPID-660


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Oct 25 19:37:54 2007
@@ -163,6 +163,7 @@
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/IncomingExecutionContext.cpp \
   qpid/broker/ManagementAgent.cpp \
+  qpid/broker/ManagementExchange.cpp \
   qpid/broker/ManagementObject.cpp \
   qpid/broker/ManagementObjectQueue.cpp \
   qpid/broker/Message.cpp \
@@ -260,6 +261,7 @@
   qpid/broker/HeadersExchange.h \
   qpid/broker/IncomingExecutionContext.h \
   qpid/broker/ManagementAgent.h \
+  qpid/broker/ManagementExchange.h \
   qpid/broker/ManagementObject.h \
   qpid/broker/ManagementObjectQueue.h \
   qpid/broker/Message.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct 25 
19:37:54 2007
@@ -28,6 +28,7 @@
 #include "NullMessageStore.h"
 #include "RecoveryManagerImpl.h"
 #include "TopicExchange.h"
+#include "ManagementExchange.h"
 
 #include "qpid/log/Statement.h"
 #include "qpid/Url.h"
@@ -104,8 +105,8 @@
     dtxManager(store.get())
 {
     if(conf.enableMgmt){
-       managementAgent = ManagementAgent::shared_ptr (new ManagementAgent 
(conf.mgmtPubInterval));
-       queues.setManagementAgent(managementAgent);
+        managementAgent = ManagementAgent::shared_ptr (new ManagementAgent 
(conf.mgmtPubInterval));
+        queues.setManagementAgent(managementAgent);
     }
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -115,16 +116,18 @@
     exchanges.declare(amq_match, HeadersExchange::typeName);
     
     if(conf.enableMgmt) {
-       QPID_LOG(info, "Management enabled");
-        exchanges.declare(qpid_management, TopicExchange::typeName);
-        managementAgent->setExchange (exchanges.get (qpid_management));
+        QPID_LOG(info, "Management enabled");
+        exchanges.declare(qpid_management, ManagementExchange::typeName);
+        Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
+        managementAgent->setExchange (mExchange);
+        dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent 
(managementAgent);
     }
     else
-       QPID_LOG(info, "Management not enabled");
+        QPID_LOG(info, "Management not enabled");
 
     if(store.get()) {
-               store->init(conf.storeDir, conf.storeAsync);
-               RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, 
+        store->init(conf.storeDir, conf.storeAsync);
+        RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, 
                                       conf.stagingThreshold);
         store->recover(recoverer);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Oct 
25 19:37:54 2007
@@ -23,6 +23,7 @@
 #include "FanOutExchange.h"
 #include "HeadersExchange.h"
 #include "TopicExchange.h"
+#include "ManagementExchange.h"
 
 using namespace qpid::broker;
 using namespace qpid::sys;
@@ -41,7 +42,7 @@
     RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i == exchanges.end()) {
-       Exchange::shared_ptr exchange;
+        Exchange::shared_ptr exchange;
 
         if(type == TopicExchange::typeName){
             exchange = Exchange::shared_ptr(new TopicExchange(name, durable, 
args));
@@ -51,13 +52,15 @@
             exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, 
args));
         }else if (type == HeadersExchange::typeName) {
             exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, 
args));
+        }else if (type == ManagementExchange::typeName) {
+            exchange = Exchange::shared_ptr(new ManagementExchange(name, 
durable, args));
         }else{
             throw UnknownExchangeTypeException();    
         }
-       exchanges[name] = exchange;
-       return std::pair<Exchange::shared_ptr, bool>(exchange, true);
+        exchanges[name] = exchange;
+        return std::pair<Exchange::shared_ptr, bool>(exchange, true);
     } else {
-       return std::pair<Exchange::shared_ptr, bool>(i->second, false);
+        return std::pair<Exchange::shared_ptr, bool>(i->second, false);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.cpp Thu Oct 
25 19:37:54 2007
@@ -35,9 +35,9 @@
     timer.add (TimerTask::shared_ptr (new Periodic(*this, interval)));
 }
 
-void ManagementAgent::setExchange (Exchange::shared_ptr exchangePtr)
+void ManagementAgent::setExchange (Exchange::shared_ptr _exchange)
 {
-    exchange = exchangePtr;
+    exchange = _exchange;
 }
 
 void ManagementAgent::addObject (ManagementObject::shared_ptr object)
@@ -46,12 +46,6 @@
     QPID_LOG(info, "Management Object Added");
 }
 
-void ManagementAgent::deleteObject (ManagementObject::shared_ptr object)
-{
-    managementObjects.remove (object);
-    QPID_LOG (debug, "Management Object Removed");
-}
-
 ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t 
_seconds)
     : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), 
agent(_agent) {}
 
@@ -61,6 +55,18 @@
     agent.PeriodicProcessing ();
 }
 
+void ManagementAgent::clientAdded (void)
+{
+    for (ManagementObjectList::iterator iter = managementObjects.begin ();
+         iter != managementObjects.end ();
+         iter++)
+    {
+        ManagementObject::shared_ptr object = *iter;
+        object->setAllChanged   ();
+        object->setSchemaNeeded ();
+    }
+}
+
 void ManagementAgent::PeriodicProcessing (void)
 {
 #define BUFSIZE   65536
@@ -69,10 +75,9 @@
     Buffer    msgBuffer (msgChars, BUFSIZE);
     uint32_t  contentSize;
 
-    //QPID_LOG (debug, "Timer Fired");
     if (managementObjects.empty ())
-       return;
-       
+        return;
+        
     Message::shared_ptr msg (new Message ());
 
     // Build the magic number for the management message.
@@ -82,74 +87,75 @@
     msgBuffer.putOctet ('1');
 
     for (ManagementObjectList::iterator iter = managementObjects.begin ();
-        iter != managementObjects.end ();
-        iter++)
+         iter != managementObjects.end ();
+         iter++)
     {
-       ManagementObject::shared_ptr objectPtr = *iter;
+        ManagementObject::shared_ptr object = *iter;
 
-       //QPID_LOG (debug, "    Object Found...");
-       
-       if (objectPtr->getSchemaNeeded ())
-       {
-           //QPID_LOG (debug, "        Generating Schema");
-           uint32_t startAvail = msgBuffer.available ();
-           uint32_t recordLength;
-           
-           msgBuffer.putOctet ('S');  // opcode = Schema Record
-           msgBuffer.putOctet (0);    // content-class = N/A
-           msgBuffer.putShort (objectPtr->getObjectType ());
-           msgBuffer.record   (); // Record the position of the length field
-           msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
-
-           objectPtr->writeSchema (msgBuffer);
-           recordLength = startAvail - msgBuffer.available ();
-           msgBuffer.restore (true);         // Restore pointer to length field
-           msgBuffer.putLong (recordLength);
-           msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
-       }
-
-       if (objectPtr->getConfigChanged ())
-       {
-           //QPID_LOG (debug, "        Generating Config");
-           uint32_t startAvail = msgBuffer.available ();
-           uint32_t recordLength;
-           
-           msgBuffer.putOctet ('C');  // opcode = Content Record
-           msgBuffer.putOctet ('C');  // content-class = Configuration
-           msgBuffer.putShort (objectPtr->getObjectType ());
-           msgBuffer.record   (); // Record the position of the length field
-           msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
-
-           objectPtr->writeConfig (msgBuffer);
-           recordLength = startAvail - msgBuffer.available ();
-           msgBuffer.restore (true);         // Restore pointer to length field
-           msgBuffer.putLong (recordLength);
-           msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
-       }
-       
-       if (objectPtr->getInstChanged ())
-       {
-           //QPID_LOG (debug, "        Generating Instrumentation");
-           uint32_t startAvail = msgBuffer.available ();
-           uint32_t recordLength;
-           
-           msgBuffer.putOctet ('C');  // opcode = Content Record
-           msgBuffer.putOctet ('I');  // content-class = Instrumentation
-           msgBuffer.putShort (objectPtr->getObjectType ());
-           msgBuffer.record   (); // Record the position of the length field
-           msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
-
-           objectPtr->writeInstrumentation (msgBuffer);
-           recordLength = startAvail - msgBuffer.available ();
-           msgBuffer.restore (true);         // Restore pointer to length field
-           msgBuffer.putLong (recordLength);
-           msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
-       }
-
-       // Temporary protection against buffer overrun.
-       // This needs to be replaced with frame fragmentation.
-       if (msgBuffer.available () < THRESHOLD)
-           break;
+        if (object->getSchemaNeeded ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('S');  // opcode = Schema Record
+            msgBuffer.putOctet (0);    // content-class = N/A
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeSchema (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length 
field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
+        }
+
+        if (object->getConfigChanged ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('C');  // opcode = Content Record
+            msgBuffer.putOctet ('C');  // content-class = Configuration
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeConfig (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length 
field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
+        }
+        
+        if (object->getInstChanged ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('C');  // opcode = Content Record
+            msgBuffer.putOctet ('I');  // content-class = Instrumentation
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeInstrumentation (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length 
field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
+        }
+
+        if (object->isDeleted ())
+        {
+            managementObjects.remove (object);
+            QPID_LOG (debug, "Management Object Removed");
+        }
+
+        // Temporary protection against buffer overrun.
+        // This needs to be replaced with frame fragmentation.
+        if (msgBuffer.available () < THRESHOLD)
+            break;
     }
     
     msgBuffer.putOctet ('X');  // End-of-message
@@ -161,7 +167,7 @@
     msgBuffer.reset ();
 
     AMQFrame method  (0, MessageTransferBody(ProtocolVersion(),
-                                            0, "qpid.management", 0, 0));
+                                             0, "qpid.management", 0, 0));
     AMQFrame header  (0, AMQHeaderBody());
     AMQFrame content;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementAgent.h Thu Oct 25 
19:37:54 2007
@@ -38,21 +38,21 @@
 
     typedef boost::shared_ptr<ManagementAgent> shared_ptr;
 
-    ManagementAgent(uint16_t interval);
+    ManagementAgent (uint16_t interval);
 
-    void setExchange  (Exchange::shared_ptr exchangePtr);
+    void setExchange  (Exchange::shared_ptr         exchange);
     void addObject    (ManagementObject::shared_ptr object);
-    void deleteObject (ManagementObject::shared_ptr object);
+    void clientAdded  (void);
     
   private:
 
     struct Periodic : public TimerTask
     {
         ManagementAgent& agent;
-       
+
         Periodic (ManagementAgent& agent, uint32_t seconds);
-       ~Periodic () {}
-       void fire ();
+        ~Periodic () {}
+        void fire ();
     };
 
     ManagementObjectList managementObjects;

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp?rev=588478&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.cpp Thu 
Oct 25 19:37:54 2007
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementExchange.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementExchange::ManagementExchange (const string& _name) :
+    Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const std::string& _name,
+                                        bool               _durable,
+                                        const FieldTable&  _args) :
+    Exchange     (_name, _durable, _args),
+    TopicExchange(_name, _durable, _args) {}
+
+
+bool ManagementExchange::bind (Queue::shared_ptr queue,
+                               const string&     routingKey,
+                               const FieldTable* args)
+{
+    bool result = TopicExchange::bind (queue, routingKey, args);
+
+    // Notify the management agent that a new management client has bound to 
the 
+    // exchange.
+    if (result)
+        managementAgent->clientAdded ();
+
+    return result;
+}
+
+void ManagementExchange::route (Deliverable&      msg,
+                                const string&     routingKey,
+                                const FieldTable* args)
+{
+    // Intercept management commands
+    if (routingKey.length () > 7 &&
+        routingKey.substr (0, 7).compare ("method.") == 0)
+    {
+        QPID_LOG (debug, "ManagementExchange: Intercept command " << 
routingKey);
+        // TODO: Send intercepted commands to ManagementAgent for dispatch
+        return;
+    }
+
+    TopicExchange::route (msg, routingKey, args);
+}
+
+void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent)
+{
+    managementAgent = agent;
+}
+
+
+ManagementExchange::~ManagementExchange() {}
+
+const std::string ManagementExchange::typeName("management");
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.h?rev=588478&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementExchange.h Thu Oct 
25 19:37:54 2007
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementExchange_
+#define _ManagementExchange_
+
+#include "TopicExchange.h"
+#include "ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementExchange : public virtual TopicExchange
+{
+  private:
+    ManagementAgent::shared_ptr managementAgent;
+ 
+  public:
+    static const std::string typeName;
+
+    ManagementExchange (const string& name);
+    ManagementExchange (const string& _name, bool _durable, 
+                        const qpid::framing::FieldTable& _args);
+
+    virtual std::string getType() const { return typeName; }            
+
+    virtual bool bind (Queue::shared_ptr queue,
+                       const string&     routingKey,
+                       const qpid::framing::FieldTable* args);
+
+    virtual void route (Deliverable& msg,
+                        const string& routingKey,
+                        const qpid::framing::FieldTable* args);
+
+    void setManagmentAgent (ManagementAgent::shared_ptr agent);
+
+    virtual ~ManagementExchange();
+};
+
+
+}
+}
+
+#endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.cpp Thu Oct 
25 19:37:54 2007
@@ -23,14 +23,19 @@
 
 using namespace qpid::framing;
 using namespace qpid::broker;
+using namespace qpid::sys;
 
 void ManagementObject::schemaItem (Buffer&     buf,
-                                  uint8_t     typeCode,
-                                  std::string name,
-                                  std::string description,
-                                  bool        isConfig)
+                                   uint8_t     typeCode,
+                                   std::string name,
+                                   std::string description,
+                                   bool        isConfig,
+                                   bool        isIndex)
 {
-    buf.putOctet       (isConfig ? 1 : 0);
+    uint8_t flags =
+        (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0);
+
+    buf.putOctet       (flags);
     buf.putOctet       (typeCode);
     buf.putShortString (name);
     buf.putShortString (description);
@@ -38,5 +43,12 @@
 
 void ManagementObject::schemaListEnd (Buffer& buf)
 {
-    buf.putOctet (0xFF);
+    buf.putOctet (FLAG_END);
+}
+
+void ManagementObject::writeTimestamps (Buffer& buf)
+{
+    buf.putLongLong (uint64_t (Duration (now ())));
+    buf.putLongLong (createTime);
+    buf.putLongLong (destroyTime);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObject.h Thu Oct 25 
19:37:54 2007
@@ -31,24 +31,30 @@
 namespace broker {
 
 using namespace qpid::framing;
+using namespace qpid::sys;
+
+const uint16_t OBJECT_SYSTEM      = 1;
+const uint16_t OBJECT_BROKER      = 2;
+const uint16_t OBJECT_VHOST       = 3;
+const uint16_t OBJECT_QUEUE       = 4;
+const uint16_t OBJECT_EXCHANGE    = 5;
+const uint16_t OBJECT_BINDING     = 6;
+const uint16_t OBJECT_CLIENT      = 7;
+const uint16_t OBJECT_SESSION     = 8;
+const uint16_t OBJECT_DESTINATION = 9;
+const uint16_t OBJECT_PRODUCER    = 10;
+const uint16_t OBJECT_CONSUMER    = 11;
 
-const uint16_t OBJECT_BROKER    = 1;
-const uint16_t OBJECT_SERVER    = 2;
-const uint16_t OBJECT_QUEUE     = 3;
-const uint16_t OBJECT_EXCHANGE  = 4;
-const uint16_t OBJECT_BINDING   = 5;
 
 class ManagementObject
 {
-  private:
-  
-    qpid::sys::AbsTime       createTime;
-    qpid::sys::AbsTime       destroyTime;
-
   protected:
     
-    bool  configChanged;
-    bool  instChanged;
+    uint64_t createTime;
+    uint64_t destroyTime;
+    bool     configChanged;
+    bool     instChanged;
+    bool     deleted;
     
     static const uint8_t TYPE_UINT8  = 1;
     static const uint8_t TYPE_UINT16 = 2;
@@ -56,18 +62,26 @@
     static const uint8_t TYPE_UINT64 = 4;
     static const uint8_t TYPE_BOOL   = 5;
     static const uint8_t TYPE_STRING = 6;
+
+    static const uint8_t FLAG_CONFIG = 0x01;
+    static const uint8_t FLAG_INDEX  = 0x02;
+    static const uint8_t FLAG_END    = 0x80;
     
     void schemaItem (Buffer&     buf,
-                    uint8_t     typeCode,
-                    std::string name,
-                    std::string description,
-                    bool        isConfig = false);
-    void schemaListEnd (Buffer & buf);
+                     uint8_t     typeCode,
+                     std::string name,
+                     std::string description,
+                     bool        isConfig = false,
+                     bool        isIndex  = false);
+    void schemaListEnd   (Buffer& buf);
+    void writeTimestamps (Buffer& buf);
 
   public:
     typedef boost::shared_ptr<ManagementObject> shared_ptr;
 
-    ManagementObject () : configChanged(true), instChanged(true) { createTime 
= qpid::sys::now (); }
+    ManagementObject () : destroyTime(0), configChanged(true),
+                          instChanged(true), deleted(false)
+    { createTime = uint64_t (Duration (now ())); }
     virtual ~ManagementObject () {}
 
     virtual uint16_t    getObjectType        (void)        = 0;
@@ -76,10 +90,21 @@
     virtual void        writeConfig          (Buffer& buf) = 0;
     virtual void        writeInstrumentation (Buffer& buf) = 0;
     virtual bool        getSchemaNeeded      (void)        = 0;
-    
+    virtual void        setSchemaNeeded      (void)        = 0;
+
     inline bool getConfigChanged (void) { return configChanged; }
     inline bool getInstChanged   (void) { return instChanged; }
-    inline void resourceDestroy  (void) { destroyTime = qpid::sys::now (); }
+    inline void setAllChanged    (void)
+    {
+        configChanged = true;
+        instChanged   = true;
+    }
+
+    inline void resourceDestroy  (void) {
+        destroyTime = uint64_t (Duration (now ()));
+        deleted     = true;
+    }
+    bool isDeleted (void) { return deleted; }
 
 };
 

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.cpp Thu 
Oct 25 19:37:54 2007
@@ -76,7 +76,9 @@
 
 void ManagementObjectQueue::writeSchema (Buffer& buf)
 {
-    schemaItem (buf, TYPE_STRING, "name",                "Queue Name", true);
+    schemaNeeded = false;
+
+    schemaItem (buf, TYPE_STRING, "name",                "Queue Name", true, 
true);
     schemaItem (buf, TYPE_BOOL,   "durable",             "Durable",    true);
     schemaItem (buf, TYPE_BOOL,   "autoDelete",          "AutoDelete", true);
 
@@ -115,21 +117,24 @@
     schemaItem (buf, TYPE_UINT32, "consumersHigh",       "Consumer high water 
mark this interval");
 
     schemaListEnd (buf);
-
-    schemaNeeded = false;
 }
 
 void ManagementObjectQueue::writeConfig (Buffer& buf)
 {
+    configChanged = false;
+
+    writeTimestamps    (buf);
     buf.putShortString (name);
     buf.putOctet       (durable    ? 1 : 0);
     buf.putOctet       (autoDelete ? 1 : 0);
-    
-    configChanged = false;
 }
 
 void ManagementObjectQueue::writeInstrumentation (Buffer& buf)
 {
+    instChanged = false;
+
+    writeTimestamps (buf);
+    buf.putShortString (name);
     buf.putLongLong (msgTotalEnqueues);
     buf.putLongLong (msgTotalDequeues);
     buf.putLongLong (msgTxEnqueues);
@@ -164,5 +169,14 @@
     buf.putLong     (consumersLow);
     buf.putLong     (consumersHigh);
 
-    instChanged = false;
+    msgDepthLow        = msgDepth;
+    msgDepthHigh       = msgDepth;
+    byteDepthLow       = byteDepth;
+    byteDepthHigh      = byteDepth;
+    enqueueTxCountLow  = enqueueTxCount;
+    enqueueTxCountHigh = enqueueTxCount;
+    dequeueTxCountLow  = dequeueTxCount;
+    dequeueTxCountHigh = dequeueTxCount;
+    consumersLow       = consumers;
+    consumersHigh      = consumers;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ManagementObjectQueue.h Thu 
Oct 25 19:37:54 2007
@@ -89,28 +89,29 @@
     void        writeConfig          (Buffer& buf);
     void        writeInstrumentation (Buffer& buf);
     bool        getSchemaNeeded      (void) { return schemaNeeded; }
+    void        setSchemaNeeded      (void) { schemaNeeded = true; }
     
     inline void adjustQueueHiLo (void){
-       if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
-       if (msgDepth < msgDepthLow)  msgDepthLow  = msgDepth;
+        if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
+        if (msgDepth < msgDepthLow)  msgDepthLow  = msgDepth;
 
-       if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
-       if (byteDepth < byteDepthLow)  byteDepthLow  = byteDepth;
-       instChanged = true;
+        if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
+        if (byteDepth < byteDepthLow)  byteDepthLow  = byteDepth;
+        instChanged = true;
     }
     
     inline void adjustTxHiLo (void){
-       if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = 
enqueueTxCount;
-       if (enqueueTxCount < enqueueTxCountLow)  enqueueTxCountLow  = 
enqueueTxCount;
-       if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = 
dequeueTxCount;
-       if (dequeueTxCount < dequeueTxCountLow)  dequeueTxCountLow  = 
dequeueTxCount;
-       instChanged = true;
+        if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = 
enqueueTxCount;
+        if (enqueueTxCount < enqueueTxCountLow)  enqueueTxCountLow  = 
enqueueTxCount;
+        if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = 
dequeueTxCount;
+        if (dequeueTxCount < dequeueTxCountLow)  dequeueTxCountLow  = 
dequeueTxCount;
+        instChanged = true;
     }
     
     inline void adjustConsumerHiLo (void){
-       if (consumers > consumersHigh) consumersHigh = consumers;
-       if (consumers < consumersLow)  consumersLow  = consumers;
-       instChanged = true;
+        if (consumers > consumersHigh) consumersHigh = consumers;
+        if (consumers < consumersLow)  consumersLow  = consumers;
+        instChanged = true;
     }
 
   public:
@@ -124,51 +125,51 @@
     // messages when counting statistics.
 
     inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
-       msgTotalEnqueues++;
-       byteTotalEnqueues += bytes;
-       
-       if (attrMask & MSG_MASK_TX){
-           msgTxEnqueues++;
-           byteTxEnqueues += bytes;
-       }
-       
-       if (attrMask & MSG_MASK_PERSIST){
-           msgPersistEnqueues++;
-           bytePersistEnqueues += bytes;
-       }
-
-       msgDepth++;
-       byteDepth += bytes;
-       adjustQueueHiLo ();
+        msgTotalEnqueues++;
+        byteTotalEnqueues += bytes;
+        
+        if (attrMask & MSG_MASK_TX){
+            msgTxEnqueues++;
+            byteTxEnqueues += bytes;
+        }
+        
+        if (attrMask & MSG_MASK_PERSIST){
+            msgPersistEnqueues++;
+            bytePersistEnqueues += bytes;
+        }
+
+        msgDepth++;
+        byteDepth += bytes;
+        adjustQueueHiLo ();
     }
     
     inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
-       msgTotalDequeues++;
-       byteTotalDequeues += bytes;
+        msgTotalDequeues++;
+        byteTotalDequeues += bytes;
 
-       if (attrMask & MSG_MASK_TX){
-           msgTxDequeues++;
-           byteTxDequeues += bytes;
-       }
-       
-       if (attrMask & MSG_MASK_PERSIST){
-           msgPersistDequeues++;
-           bytePersistDequeues += bytes;
-       }
-
-       msgDepth--;
-       byteDepth -= bytes;
-       adjustQueueHiLo ();
+        if (attrMask & MSG_MASK_TX){
+            msgTxDequeues++;
+            byteTxDequeues += bytes;
+        }
+        
+        if (attrMask & MSG_MASK_PERSIST){
+            msgPersistDequeues++;
+            bytePersistDequeues += bytes;
+        }
+
+        msgDepth--;
+        byteDepth -= bytes;
+        adjustQueueHiLo ();
     }
     
     inline void incConsumers (void){
-       consumers++;
-       adjustConsumerHiLo ();
+        consumers++;
+        adjustConsumerHiLo ();
     }
     
     inline void decConsumers (void){
-       consumers--;
-       adjustConsumerHiLo ();
+        consumers--;
+        adjustConsumerHiLo ();
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 25 19:37:54 
2007
@@ -293,6 +293,10 @@
         }
         browsers.push_back(c);
     }
+
+    if (mgmtObjectPtr != 0){
+        mgmtObjectPtr->incConsumers ();
+    }
 }
 
 void Queue::cancel(Consumer::ptr c){
@@ -301,6 +305,9 @@
         cancel(c, acquirers);
     } else {
         cancel(c, browsers);
+    }
+    if (mgmtObjectPtr != 0){
+        mgmtObjectPtr->decConsumers ();
     }
     if(exclusive == c) exclusive.reset();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=588478&r1=588477&r2=588478&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Thu Oct 25 
19:37:54 2007
@@ -21,6 +21,7 @@
 #include "QueueRegistry.h"
 #include "ManagementAgent.h"
 #include "ManagementObjectQueue.h"
+#include "qpid/log/Statement.h"
 #include <sstream>
 #include <assert.h>
 
@@ -42,33 +43,32 @@
     QueueMap::iterator i =  queues.find(name);
 
     if (i == queues.end()) {
-       Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 
0, owner));
-       queues[name] = queue;
+        Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 
0, owner));
+        queues[name] = queue;
 
-       if (managementAgent){
-           ManagementObjectQueue::shared_ptr mgmtObject(new 
ManagementObjectQueue (name, durable, autoDelete));
+        if (managementAgent){
+            ManagementObjectQueue::shared_ptr mgmtObject(new 
ManagementObjectQueue (name, durable, autoDelete));
 
-           queue->setMgmt (mgmtObject);
-           
managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject));
-       }
-       
-       return std::pair<Queue::shared_ptr, bool>(queue, true);
+            queue->setMgmt (mgmtObject);
+            
managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject));
+        }
+
+        return std::pair<Queue::shared_ptr, bool>(queue, true);
     } else {
-       return std::pair<Queue::shared_ptr, bool>(i->second, false);
+        return std::pair<Queue::shared_ptr, bool>(i->second, false);
     }
 }
 
 void QueueRegistry::destroy(const string& name){
     RWlock::ScopedWlock locker(lock);
-
     if (managementAgent){
-       ManagementObjectQueue::shared_ptr mgmtObject;
-       QueueMap::iterator i = queues.find(name);
+        ManagementObjectQueue::shared_ptr mgmtObject;
+        QueueMap::iterator i = queues.find(name);
 
-       if (i != queues.end()){
-           mgmtObject = i->second->getMgmt ();
-           managementAgent->deleteObject 
(dynamic_pointer_cast<ManagementObject>(mgmtObject));
-       }
+        if (i != queues.end()){
+            mgmtObject = i->second->getMgmt ();
+            mgmtObject->resourceDestroy ();
+        }
     }
 
     queues.erase(name);
@@ -79,20 +79,20 @@
     QueueMap::iterator i = queues.find(name);
     
     if (i == queues.end()) {
-       return Queue::shared_ptr();
+        return Queue::shared_ptr();
     } else {
-       return i->second;
+        return i->second;
     }
 }
 
 string QueueRegistry::generateName(){
     string name;
     do {
-       std::stringstream ss;
-       ss << "tmp_" << counter++;
-       name = ss.str();
-       // Thread safety: Private function, only called with lock held
-       // so this is OK.
+        std::stringstream ss;
+        ss << "tmp_" << counter++;
+        name = ss.str();
+        // Thread safety: Private function, only called with lock held
+        // so this is OK.
     } while(queues.find(name) != queues.end());
     return name;
 }


Reply via email to