Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h Mon Nov 12 
16:34:09 2007
@@ -0,0 +1,76 @@
+#ifndef _ManagementBroker_
+#define _ManagementBroker_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+#include "qpid/Options.h"
+#include "boost/shared_ptr.hpp"
+
+namespace qpid { 
+namespace management {
+
+class Broker : public ManagementObject
+{
+  public:
+
+    typedef boost::shared_ptr<Broker> shared_ptr;
+
+    Broker  (Manageable* coreObject, const Options& conf);
+    ~Broker (void);
+
+  private:
+
+    static bool schemaNeeded;
+
+    std::string sysId;
+    uint16_t    port;
+    uint16_t    workerThreads;
+    uint16_t    maxConns;
+    uint16_t    connBacklog;
+    uint32_t    stagingThreshold;
+    std::string storeLib;
+    bool        asyncStore;
+    uint16_t    mgmtPubInterval;
+    uint32_t    initialDiskPageSize;
+    uint32_t    initialPagesPerQueue;
+    std::string clusterName;
+    std::string version;
+
+    uint16_t    getObjectType        (void) { return OBJECT_BROKER; }
+    std::string getObjectName        (void) { return "broker"; }
+    void        writeSchema          (qpid::framing::Buffer& buf);
+    void        writeConfig          (qpid::framing::Buffer& buf);
+    void        writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
+    bool        getSchemaNeeded      (void) { return schemaNeeded; }
+    void        setSchemaNeeded      (void) { schemaNeeded = true; }
+    void        doMethod             (std::string            methodName,
+                                      qpid::framing::Buffer& inBuf,
+                                      qpid::framing::Buffer& outBuf);
+
+    inline bool getInstChanged       (void) { return false; }
+};
+
+}}
+
+
+#endif  /*!_ManagementBroker_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h Mon Nov 12 
16:34:09 2007
@@ -0,0 +1,67 @@
+#ifndef _Manageable_
+#define _Manageable_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "ManagementObject.h"
+#include "Args.h"
+#include "qpid/sys/Time.h"
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <map>
+
+namespace qpid { 
+namespace management {
+
+class Manageable
+{
+  public:
+
+    virtual ~Manageable (void) = 0;
+
+    //  status_t is a type used to pass completion status from the method 
handler.
+    //
+    typedef uint32_t status_t;
+
+    static const status_t STATUS_OK              = 0;
+    static const status_t STATUS_UNKNOWN_OBJECT  = 1;
+    static const status_t STATUS_UNKNOWN_METHOD  = 2;
+
+    //  Every "Manageable" object must hold a reference to exactly one
+    //  management object.  This object is always of a class derived from
+    //  the pure-virtual "ManagementObject".
+    //
+    //  This accessor function returns a shared_ptr to the management object.
+    //
+    virtual ManagementObject::shared_ptr GetManagementObject (void) const = 0;
+
+    //  Every "Manageable" object must implement ManagementMethod.  This
+    //  function is called when a remote management client invokes a method
+    //  on this object.  The input and output arguments are specific to the
+    //  method being called and must be down-cast to the appropriate sub class
+    //  before use.
+    virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0;
+};
+
+inline Manageable::~Manageable (void) {}
+
+}}
+
+#endif  /*!_Manageable_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon 
Nov 12 16:34:09 2007
@@ -0,0 +1,325 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "ManagementAgent.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageDelivery.h>
+#include <qpid/framing/AMQFrame.h>
+#include <list>
+
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+ManagementAgent::shared_ptr ManagementAgent::agent;
+
+ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
+{
+    timer.add (TimerTask::shared_ptr (new Periodic(*this, interval)));
+    nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
+}
+
+ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
+{
+    if (agent.get () == 0)
+        agent = shared_ptr (new ManagementAgent (10));
+
+    return agent;
+}
+
+void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange,
+                                   Exchange::shared_ptr _dexchange)
+{
+    mExchange = _mexchange;
+    dExchange = _dexchange;
+}
+
+void ManagementAgent::addObject (ManagementObject::shared_ptr object)
+{
+    uint64_t objectId = nextObjectId++;
+
+    object->setObjectId (objectId);
+    managementObjects[objectId] = object;
+}
+
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t 
_seconds)
+    : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), 
agent(_agent) {}
+
+void ManagementAgent::Periodic::fire ()
+{
+    agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, 
agent.interval)));
+    agent.PeriodicProcessing ();
+}
+
+void ManagementAgent::clientAdded (void)
+{
+    for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+         iter != managementObjects.end ();
+         iter++)
+    {
+        ManagementObject::shared_ptr object = iter->second;
+        object->setAllChanged   ();
+        object->setSchemaNeeded ();
+    }
+}
+
+void ManagementAgent::PeriodicProcessing (void)
+{
+#define BUFSIZE   65536
+#define THRESHOLD 16384
+    char      msgChars[BUFSIZE];
+    Buffer    msgBuffer (msgChars, BUFSIZE);
+    uint32_t  contentSize;
+    std::list<uint64_t> deleteList;
+
+    if (managementObjects.empty ())
+        return;
+        
+    Message::shared_ptr msg (new Message ());
+
+    // Build the magic number for the management message.
+    msgBuffer.putOctet ('A');
+    msgBuffer.putOctet ('M');
+    msgBuffer.putOctet ('0');
+    msgBuffer.putOctet ('1');
+
+    for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+         iter != managementObjects.end ();
+         iter++)
+    {
+        ManagementObject::shared_ptr object = iter->second;
+
+        if (object->getSchemaNeeded ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('S');  // opcode = Schema Record
+            msgBuffer.putOctet (0);    // content-class = N/A
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeSchema (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length 
field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
+        }
+
+        if (object->getConfigChanged ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('C');  // opcode = Content Record
+            msgBuffer.putOctet ('C');  // content-class = Configuration
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeConfig (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length 
field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
+        }
+        
+        if (object->getInstChanged ())
+        {
+            uint32_t startAvail = msgBuffer.available ();
+            uint32_t recordLength;
+            
+            msgBuffer.putOctet ('C');  // opcode = Content Record
+            msgBuffer.putOctet ('I');  // content-class = Instrumentation
+            msgBuffer.putShort (object->getObjectType ());
+            msgBuffer.record   (); // Record the position of the length field
+            msgBuffer.putLong  (0xFFFFFFFF); // Placeholder for length
+
+            object->writeInstrumentation (msgBuffer);
+            recordLength = startAvail - msgBuffer.available ();
+            msgBuffer.restore (true);         // Restore pointer to length 
field
+            msgBuffer.putLong (recordLength);
+            msgBuffer.restore ();             // Re-restore to get to the end 
of the buffer
+        }
+
+        if (object->isDeleted ())
+            deleteList.push_back (iter->first);
+
+        // Temporary protection against buffer overrun.
+        // This needs to be replaced with frame fragmentation.
+        if (msgBuffer.available () < THRESHOLD)
+            break;
+    }
+
+    msgBuffer.putOctet ('X');  // End-of-message
+    msgBuffer.putOctet (0);
+    msgBuffer.putShort (0);
+    msgBuffer.putLong  (8);
+
+    contentSize = BUFSIZE - msgBuffer.available ();
+    msgBuffer.reset ();
+
+    AMQFrame method  (0, MessageTransferBody(ProtocolVersion(),
+                                             0, "qpid.management", 0, 0));
+    AMQFrame header  (0, AMQHeaderBody());
+    AMQFrame content;
+
+    content.setBody(AMQContentBody());
+    content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize);
+
+    method.setEof  (false);
+    header.setBof  (false);
+    header.setEof  (false);
+    content.setBof (false);
+
+    msg->getFrames().append(method);
+    msg->getFrames().append(header);
+
+    MessageProperties* props = 
msg->getFrames().getHeaders()->get<MessageProperties>(true);
+    props->setContentLength(contentSize);
+    msg->getFrames().append(content);
+
+    DeliverableMessage deliverable (msg);
+    mExchange->route (deliverable, "mgmt", 0);
+
+    // Delete flagged objects
+    for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+         iter != deleteList.rend ();
+         iter++)
+    {
+        managementObjects.erase (*iter);
+    }
+    deleteList.clear ();
+}
+
+void ManagementAgent::dispatchCommand (Deliverable&      deliverable,
+                                       const string&     routingKey,
+                                       const FieldTable* /*args*/)
+{
+    size_t    pos, start;
+    Message&  msg = ((DeliverableMessage&) deliverable).getMessage ();
+    uint32_t  contentSize;
+
+    if (routingKey.compare (0, 7, "method.") != 0)
+    {
+        QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
+        return;
+    }
+
+    start = 7;
+    if (routingKey.length () == start)
+    {
+        QPID_LOG (debug, "Missing package-name in routing key: " << 
routingKey);
+        return;
+    }
+
+    pos = routingKey.find ('.', start);
+    if (pos == string::npos || routingKey.length () == pos + 1)
+    {
+        QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
+        return;
+    }
+
+    string packageName = routingKey.substr (start, pos - start);
+
+    start = pos + 1;
+    pos = routingKey.find ('.', start);
+    if (pos == string::npos || routingKey.length () == pos + 1)
+    {
+        QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
+        return;
+    }
+
+    string className = routingKey.substr (start, pos - start);
+
+    start = pos + 1;
+    string methodName = routingKey.substr (start, routingKey.length () - 
start);
+
+    QPID_LOG (debug, "Dispatch package: " << packageName << ", class: "
+              << className << ", method: " << methodName);
+
+    contentSize = msg.encodedContentSize ();
+    if (contentSize < 8 || contentSize > 65536)
+        return;
+
+    char   *inMem  = new char[contentSize];
+    char   outMem[4096]; // TODO Fix This
+    Buffer inBuffer  (inMem,  contentSize);
+    Buffer outBuffer (outMem, 4096);
+
+    msg.encodeContent (inBuffer);
+    inBuffer.reset ();
+
+    uint32_t methodId = inBuffer.getLong     ();
+    uint64_t objId    = inBuffer.getLongLong ();
+    string   replyTo;
+
+    inBuffer.getShortString (replyTo);
+
+    QPID_LOG (debug, "    len = " << contentSize << ", methodId = " <<
+              methodId << ", objId = " << objId);
+
+    outBuffer.putLong (methodId);
+
+    ManagementObjectMap::iterator iter = managementObjects.find (objId);
+    if (iter == managementObjects.end ())
+    {
+        outBuffer.putLong        (2);
+        outBuffer.putShortString ("Invalid Object Id");
+    }
+    else
+    {
+        iter->second->doMethod (methodName, inBuffer, outBuffer);
+    }
+
+    Message::shared_ptr outMsg (new Message ());
+    uint32_t            msgSize = 4096 - outBuffer.available ();
+    outBuffer.reset ();
+    AMQFrame method  (0, MessageTransferBody(ProtocolVersion(),
+                                             0, "amq.direct", 0, 0));
+    AMQFrame header  (0, AMQHeaderBody());
+    AMQFrame content;
+
+    content.setBody(AMQContentBody());
+    content.castBody<AMQContentBody>()->decode(outBuffer, msgSize);
+
+    method.setEof  (false);
+    header.setBof  (false);
+    header.setEof  (false);
+    content.setBof (false);
+
+    outMsg->getFrames().append(method);
+    outMsg->getFrames().append(header);
+
+    MessageProperties* props = 
outMsg->getFrames().getHeaders()->get<MessageProperties>(true);
+    props->setContentLength(msgSize);
+    outMsg->getFrames().append(content);
+
+    DeliverableMessage outDeliverable (outMsg);
+    dExchange->route (outDeliverable, replyTo, 0);
+
+    free (inMem);
+}
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Nov 
12 16:34:09 2007
@@ -0,0 +1,81 @@
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Timer.h"
+#include "ManagementObject.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid { 
+namespace management {
+
+class ManagementAgent
+{
+  private:
+
+    ManagementAgent (uint16_t interval);
+
+  public:
+
+    typedef boost::shared_ptr<ManagementAgent> shared_ptr;
+
+    static shared_ptr getAgent (void);
+
+    void setInterval     (uint16_t _interval) { interval = _interval; }
+    void setExchange     (broker::Exchange::shared_ptr  mgmtExchange,
+                          broker::Exchange::shared_ptr  directExchange);
+    void addObject       (ManagementObject::shared_ptr object);
+    void clientAdded     (void);
+    void dispatchCommand (broker::Deliverable&             msg,
+                          const std::string&               routingKey,
+                          const qpid::framing::FieldTable* args);
+    
+  private:
+
+    struct Periodic : public broker::TimerTask
+    {
+        ManagementAgent& agent;
+
+        Periodic (ManagementAgent& agent, uint32_t seconds);
+        ~Periodic () {}
+        void fire ();
+    };
+
+    static shared_ptr            agent;
+    ManagementObjectMap          managementObjects;
+    broker::Timer                timer;
+    broker::Exchange::shared_ptr mExchange;
+    broker::Exchange::shared_ptr dExchange;
+    uint16_t                     interval;
+    uint64_t                     nextObjectId;
+
+    void PeriodicProcessing (void);
+};
+
+}}
+            
+
+
+#endif  /*!_ManagementAgent_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp 
Mon Nov 12 16:34:09 2007
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementExchange.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementExchange::ManagementExchange (const string& _name) :
+    Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const std::string& _name,
+                                        bool               _durable,
+                                        const FieldTable&  _args) :
+    Exchange     (_name, _durable, _args),
+    TopicExchange(_name, _durable, _args) {}
+
+
+bool ManagementExchange::bind (Queue::shared_ptr queue,
+                               const string&     routingKey,
+                               const FieldTable* args)
+{
+    bool result = TopicExchange::bind (queue, routingKey, args);
+
+    // Notify the management agent that a new management client has bound to 
the 
+    // exchange.
+    if (result)
+        managementAgent->clientAdded ();
+
+    return result;
+}
+
+void ManagementExchange::route (Deliverable&      msg,
+                                const string&     routingKey,
+                                const FieldTable* args)
+{
+    // Intercept management commands
+    if (routingKey.length () > 7 &&
+        routingKey.substr (0, 7).compare ("method.") == 0)
+    {
+        managementAgent->dispatchCommand (msg, routingKey, args);
+        return;
+    }
+
+    TopicExchange::route (msg, routingKey, args);
+}
+
+void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent)
+{
+    managementAgent = agent;
+}
+
+
+ManagementExchange::~ManagementExchange() {}
+
+const std::string ManagementExchange::typeName("management");
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h Mon 
Nov 12 16:34:09 2007
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementExchange_
+#define _ManagementExchange_
+
+#include "qpid/broker/TopicExchange.h"
+#include "ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementExchange : public virtual TopicExchange
+{
+  private:
+    management::ManagementAgent::shared_ptr managementAgent;
+ 
+  public:
+    static const std::string typeName;
+
+    ManagementExchange (const string& name);
+    ManagementExchange (const string& _name, bool _durable, 
+                        const qpid::framing::FieldTable& _args);
+
+    virtual std::string getType() const { return typeName; }            
+
+    virtual bool bind (Queue::shared_ptr queue,
+                       const string&     routingKey,
+                       const qpid::framing::FieldTable* args);
+
+    virtual void route (Deliverable& msg,
+                        const string& routingKey,
+                        const qpid::framing::FieldTable* args);
+
+    void setManagmentAgent (management::ManagementAgent::shared_ptr agent);
+
+    virtual ~ManagementExchange();
+};
+
+
+}
+}
+
+#endif

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Mon 
Nov 12 16:34:09 2007
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "Manageable.h"
+#include "ManagementObject.h"
+
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::sys;
+
+void ManagementObject::schemaItem (Buffer&     buf,
+                                   uint8_t     typeCode,
+                                   std::string name,
+                                   std::string description,
+                                   bool        isConfig,
+                                   bool        isIndex)
+{
+    uint8_t flags =
+        (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0);
+
+    buf.putOctet       (flags);
+    buf.putOctet       (typeCode);
+    buf.putShortString (name);
+    buf.putShortString (description);
+}
+
+void ManagementObject::schemaListBegin (Buffer& buf)
+{
+    schemaItem (buf, TYPE_UINT64, "id", "Object ID", true, true);
+}
+
+void ManagementObject::schemaListEnd (Buffer& buf)
+{
+    buf.putOctet (FLAG_END);
+}
+
+void ManagementObject::writeTimestamps (Buffer& buf)
+{
+    buf.putLongLong (uint64_t (Duration (now ())));
+    buf.putLongLong (createTime);
+    buf.putLongLong (destroyTime);
+    buf.putLongLong (objectId);
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Mon 
Nov 12 16:34:09 2007
@@ -0,0 +1,125 @@
+#ifndef _ManagementObject_
+#define _ManagementObject_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+#include "qpid/sys/Time.h"
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <map>
+
+namespace qpid { 
+namespace management {
+
+const uint16_t OBJECT_SYSTEM      = 1;
+const uint16_t OBJECT_BROKER      = 2;
+const uint16_t OBJECT_VHOST       = 3;
+const uint16_t OBJECT_QUEUE       = 4;
+const uint16_t OBJECT_EXCHANGE    = 5;
+const uint16_t OBJECT_BINDING     = 6;
+const uint16_t OBJECT_CLIENT      = 7;
+const uint16_t OBJECT_SESSION     = 8;
+const uint16_t OBJECT_DESTINATION = 9;
+const uint16_t OBJECT_PRODUCER    = 10;
+const uint16_t OBJECT_CONSUMER    = 11;
+
+class Manageable;
+
+class ManagementObject
+{
+  protected:
+    
+    uint64_t    createTime;
+    uint64_t    destroyTime;
+    uint64_t    objectId;
+    bool        configChanged;
+    bool        instChanged;
+    bool        deleted;
+    Manageable* coreObject;
+    
+    static const uint8_t TYPE_UINT8  = 1;
+    static const uint8_t TYPE_UINT16 = 2;
+    static const uint8_t TYPE_UINT32 = 3;
+    static const uint8_t TYPE_UINT64 = 4;
+    static const uint8_t TYPE_BOOL   = 5;
+    static const uint8_t TYPE_STRING = 6;
+
+    static const uint8_t FLAG_CONFIG = 0x01;
+    static const uint8_t FLAG_INDEX  = 0x02;
+    static const uint8_t FLAG_END    = 0x80;
+    
+    void schemaItem      (qpid::framing::Buffer& buf,
+                          uint8_t     typeCode,
+                          std::string name,
+                          std::string description,
+                          bool        isConfig = false,
+                          bool        isIndex  = false);
+    void schemaListBegin (qpid::framing::Buffer& buf);
+    void schemaListEnd   (qpid::framing::Buffer& buf);
+    void writeTimestamps (qpid::framing::Buffer& buf);
+
+  public:
+    typedef boost::shared_ptr<ManagementObject> shared_ptr;
+
+    ManagementObject (Manageable* _core) :
+        destroyTime(0), objectId (0), configChanged(true),
+        instChanged(true), deleted(false), coreObject(_core)
+    { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
+    virtual ~ManagementObject () {}
+
+    virtual uint16_t    getObjectType        (void) = 0;
+    virtual std::string getObjectName        (void) = 0;
+    virtual void        writeSchema          (qpid::framing::Buffer& buf) = 0;
+    virtual void        writeConfig          (qpid::framing::Buffer& buf) = 0;
+    virtual void        writeInstrumentation (qpid::framing::Buffer& buf) = 0;
+    virtual bool        getSchemaNeeded      (void) = 0;
+    virtual void        setSchemaNeeded      (void) = 0;
+    virtual void        doMethod             (std::string            
methodName,
+                                              qpid::framing::Buffer& inBuf,
+                                              qpid::framing::Buffer& outBuf) = 
0;
+
+    void         setObjectId      (uint64_t oid) { objectId = oid; }
+    uint64_t     getObjectId      (void) { return objectId; }
+    inline  bool getConfigChanged (void) { return configChanged; }
+    virtual bool getInstChanged   (void) { return instChanged; }
+    inline  void setAllChanged    (void)
+    {
+        configChanged = true;
+        instChanged   = true;
+    }
+
+    inline void resourceDestroy  (void) {
+        destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
+        deleted     = true;
+    }
+    bool isDeleted (void) { return deleted; }
+
+};
+
+ typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap;
+
+}}
+            
+
+
+#endif  /*!_ManagementObject_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp Mon Nov 12 
16:34:09 2007
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "Manageable.h" 
+#include "Queue.h"
+
+using namespace qpid::management;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool Queue::schemaNeeded = true;
+
+Queue::Queue (Manageable* _core, Manageable* _parent, 
+              const std::string& _name,
+              bool _durable, bool _autoDelete) :
+    ManagementObject(_core), name(_name),
+    durable(_durable), autoDelete(_autoDelete)
+{
+    vhostRef = _parent->GetManagementObject ()->getObjectId ();
+
+    msgTotalEnqueues     = 0;
+    msgTotalDequeues     = 0;
+    msgTxEnqueues        = 0;
+    msgTxDequeues        = 0;
+    msgPersistEnqueues   = 0;
+    msgPersistDequeues   = 0;
+
+    msgDepth             = 0;
+    msgDepthLow          = 0;
+    msgDepthHigh         = 0;
+
+    byteTotalEnqueues    = 0;
+    byteTotalDequeues    = 0;
+    byteTxEnqueues       = 0;
+    byteTxDequeues       = 0;
+    bytePersistEnqueues  = 0;
+    bytePersistDequeues  = 0;
+    
+    byteDepth            = 0;
+    byteDepthLow         = 0;
+    byteDepthHigh        = 0;
+
+    enqueueTxStarts      = 0;
+    enqueueTxCommits     = 0;
+    enqueueTxRejects     = 0;
+    dequeueTxStarts      = 0;
+    dequeueTxCommits     = 0;
+    dequeueTxRejects     = 0;
+    
+    enqueueTxCount       = 0;
+    enqueueTxCountLow    = 0;
+    enqueueTxCountHigh   = 0;
+
+    dequeueTxCount       = 0;
+    dequeueTxCountLow    = 0;
+    dequeueTxCountHigh   = 0;
+
+    consumers            = 0;
+    consumersLow         = 0;
+    consumersHigh        = 0;
+}
+
+Queue::~Queue () {}
+
+void Queue::writeSchema (Buffer& buf)
+{
+    schemaNeeded = false;
+
+    schemaListBegin (buf);
+    schemaItem (buf, TYPE_UINT64, "vhostRef",            "Virtual Host Ref", 
true);
+    schemaItem (buf, TYPE_STRING, "name",                "Queue Name", true);
+    schemaItem (buf, TYPE_BOOL,   "durable",             "Durable",    true);
+    schemaItem (buf, TYPE_BOOL,   "autoDelete",          "AutoDelete", true);
+    schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues",    "Total messages 
enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgTotalDequeues",    "Total messages 
dequeued");
+    schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues",      "Transactional 
messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgTxnDequeues",      "Transactional 
messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues",  "Persistent messages 
enqueued");
+    schemaItem (buf, TYPE_UINT64, "msgPersistDequeues",  "Persistent messages 
dequeued");
+    schemaItem (buf, TYPE_UINT32, "msgDepth",            "Current size of 
queue in messages");
+    schemaItem (buf, TYPE_UINT32, "msgDepthLow",         "Low-water queue 
size, this interval");
+    schemaItem (buf, TYPE_UINT32, "msgDepthHigh",        "High-water queue 
size, this interval");
+    schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues",   "Total messages 
enqueued");
+    schemaItem (buf, TYPE_UINT64, "byteTotalDequeues",   "Total messages 
dequeued");
+    schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues",     "Transactional 
messages enqueued");
+    schemaItem (buf, TYPE_UINT64, "byteTxnDequeues",     "Transactional 
messages dequeued");
+    schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages 
enqueued");
+    schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages 
dequeued");
+    schemaItem (buf, TYPE_UINT32, "byteDepth",           "Current size of 
queue in bytes");
+    schemaItem (buf, TYPE_UINT32, "byteDepthLow",        "Low-water mark this 
interval");
+    schemaItem (buf, TYPE_UINT32, "byteDepthHigh",       "High-water mark this 
interval");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts",    "Total enqueue 
transactions started ");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits",   "Total enqueue 
transactions committed");
+    schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects",   "Total enqueue 
transactions rejected");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxnCount",     "Current pending 
enqueue transactions");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow",  "Low water mark this 
interval");
+    schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this 
interval");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts",    "Total dequeue 
transactions started ");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits",   "Total dequeue 
transactions committed");
+    schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects",   "Total dequeue 
transactions rejected");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxnCount",     "Current pending 
dequeue transactions");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow",  "Transaction low 
water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high 
water mark this interval");
+    schemaItem (buf, TYPE_UINT32, "consumers",           "Current consumers on 
queue");
+    schemaItem (buf, TYPE_UINT32, "consumersLow",        "Consumer low water 
mark this interval");
+    schemaItem (buf, TYPE_UINT32, "consumersHigh",       "Consumer high water 
mark this interval");
+    schemaListEnd (buf);
+}
+
+void Queue::writeConfig (Buffer& buf)
+{
+    configChanged = false;
+
+    writeTimestamps    (buf);
+    buf.putLongLong    (vhostRef);
+    buf.putShortString (name);
+    buf.putOctet       (durable    ? 1 : 0);
+    buf.putOctet       (autoDelete ? 1 : 0);
+}
+
+void Queue::writeInstrumentation (Buffer& buf)
+{
+    instChanged = false;
+
+    writeTimestamps (buf);
+    buf.putLongLong (msgTotalEnqueues);
+    buf.putLongLong (msgTotalDequeues);
+    buf.putLongLong (msgTxEnqueues);
+    buf.putLongLong (msgTxDequeues);
+    buf.putLongLong (msgPersistEnqueues);
+    buf.putLongLong (msgPersistDequeues);
+    buf.putLong     (msgDepth);
+    buf.putLong     (msgDepthLow);
+    buf.putLong     (msgDepthHigh);
+    buf.putLongLong (byteTotalEnqueues);
+    buf.putLongLong (byteTotalDequeues);
+    buf.putLongLong (byteTxEnqueues);
+    buf.putLongLong (byteTxDequeues);
+    buf.putLongLong (bytePersistEnqueues);
+    buf.putLongLong (bytePersistDequeues);
+    buf.putLong     (byteDepth);
+    buf.putLong     (byteDepthLow);
+    buf.putLong     (byteDepthHigh);
+    buf.putLongLong (enqueueTxStarts);
+    buf.putLongLong (enqueueTxCommits);
+    buf.putLongLong (enqueueTxRejects);
+    buf.putLong     (enqueueTxCount);
+    buf.putLong     (enqueueTxCountLow);
+    buf.putLong     (enqueueTxCountHigh);
+    buf.putLongLong (dequeueTxStarts);
+    buf.putLongLong (dequeueTxCommits);
+    buf.putLongLong (dequeueTxRejects);
+    buf.putLong     (dequeueTxCount);
+    buf.putLong     (dequeueTxCountLow);
+    buf.putLong     (dequeueTxCountHigh);
+    buf.putLong     (consumers);
+    buf.putLong     (consumersLow);
+    buf.putLong     (consumersHigh);
+
+    msgDepthLow        = msgDepth;
+    msgDepthHigh       = msgDepth;
+    byteDepthLow       = byteDepth;
+    byteDepthHigh      = byteDepth;
+    enqueueTxCountLow  = enqueueTxCount;
+    enqueueTxCountHigh = enqueueTxCount;
+    dequeueTxCountLow  = dequeueTxCount;
+    dequeueTxCountHigh = dequeueTxCount;
+    consumersLow       = consumers;
+    consumersHigh      = consumers;
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h Mon Nov 12 
16:34:09 2007
@@ -0,0 +1,184 @@
+#ifndef _ManagementQueue_
+#define _ManagementQueue_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+
+namespace qpid { 
+namespace management {
+
+const uint32_t MSG_MASK_TX      = 1;  // Transactional message
+const uint32_t MSG_MASK_PERSIST = 2;  // Persistent message
+
+class Queue : public ManagementObject
+{
+  private:
+
+    static bool schemaNeeded;
+
+    uint64_t    vhostRef;
+    std::string name;
+    bool        durable;
+    bool        autoDelete;
+  
+    uint64_t  msgTotalEnqueues;     // Total messages enqueued
+    uint64_t  msgTotalDequeues;     // Total messages dequeued
+    uint64_t  msgTxEnqueues;        // Transactional messages enqueued
+    uint64_t  msgTxDequeues;        // Transactional messages dequeued
+    uint64_t  msgPersistEnqueues;   // Persistent messages enqueued
+    uint64_t  msgPersistDequeues;   // Persistent messages dequeued
+    
+    uint32_t  msgDepth;             // Current size of queue in messages
+    uint32_t  msgDepthLow;          // Low-water queue size, this interval
+    uint32_t  msgDepthHigh;         // High-water queue size, this interval
+
+    uint64_t  byteTotalEnqueues;    // Total messages enqueued
+    uint64_t  byteTotalDequeues;    // Total messages dequeued
+    uint64_t  byteTxEnqueues;       // Transactional messages enqueued
+    uint64_t  byteTxDequeues;       // Transactional messages dequeued
+    uint64_t  bytePersistEnqueues;  // Persistent messages enqueued
+    uint64_t  bytePersistDequeues;  // Persistent messages dequeued
+
+    uint32_t  byteDepth;            // Current size of queue in bytes
+    uint32_t  byteDepthLow;         // Low-water mark this interval
+    uint32_t  byteDepthHigh;        // High-water mark this interval
+    
+    uint64_t  enqueueTxStarts;      // Total enqueue transactions started 
+    uint64_t  enqueueTxCommits;     // Total enqueue transactions committed
+    uint64_t  enqueueTxRejects;     // Total enqueue transactions rejected
+    
+    uint32_t  enqueueTxCount;       // Current pending enqueue transactions
+    uint32_t  enqueueTxCountLow;    // Low water mark this interval
+    uint32_t  enqueueTxCountHigh;   // High water mark this interval
+    
+    uint64_t  dequeueTxStarts;      // Total dequeue transactions started 
+    uint64_t  dequeueTxCommits;     // Total dequeue transactions committed
+    uint64_t  dequeueTxRejects;     // Total dequeue transactions rejected
+    
+    uint32_t  dequeueTxCount;       // Current pending dequeue transactions
+    uint32_t  dequeueTxCountLow;    // Low water mark this interval
+    uint32_t  dequeueTxCountHigh;   // High water mark this interval
+    
+    uint32_t  consumers;            // Current consumers on queue
+    uint32_t  consumersLow;         // Low water mark this interval
+    uint32_t  consumersHigh;        // High water mark this interval
+
+    uint16_t    getObjectType        (void) { return OBJECT_QUEUE; }
+    std::string getObjectName        (void) { return "queue"; }
+    void        writeSchema          (qpid::framing::Buffer& buf);
+    void        writeConfig          (qpid::framing::Buffer& buf);
+    void        writeInstrumentation (qpid::framing::Buffer& buf);
+    bool        getSchemaNeeded      (void) { return schemaNeeded; }
+    void        setSchemaNeeded      (void) { schemaNeeded = true; }
+    void        doMethod             (std::string            /*methodName*/,
+                                      qpid::framing::Buffer& /*inBuf*/,
+                                      qpid::framing::Buffer& /*outBuf*/) {}
+
+    inline void adjustQueueHiLo (void){
+        if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
+        if (msgDepth < msgDepthLow)  msgDepthLow  = msgDepth;
+
+        if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
+        if (byteDepth < byteDepthLow)  byteDepthLow  = byteDepth;
+        instChanged = true;
+    }
+    
+    inline void adjustTxHiLo (void){
+        if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = 
enqueueTxCount;
+        if (enqueueTxCount < enqueueTxCountLow)  enqueueTxCountLow  = 
enqueueTxCount;
+        if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = 
dequeueTxCount;
+        if (dequeueTxCount < dequeueTxCountLow)  dequeueTxCountLow  = 
dequeueTxCount;
+        instChanged = true;
+    }
+    
+    inline void adjustConsumerHiLo (void){
+        if (consumers > consumersHigh) consumersHigh = consumers;
+        if (consumers < consumersLow)  consumersLow  = consumers;
+        instChanged = true;
+    }
+
+  public:
+
+    typedef boost::shared_ptr<Queue> shared_ptr;
+
+    Queue (Manageable* coreObject, Manageable* parentObject,
+           const std::string& name, bool durable, bool autoDelete);
+    ~Queue (void);
+
+    // The following mask contents are used to describe enqueued or dequeued
+    // messages when counting statistics.
+
+    inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
+        msgTotalEnqueues++;
+        byteTotalEnqueues += bytes;
+        
+        if (attrMask & MSG_MASK_TX){
+            msgTxEnqueues++;
+            byteTxEnqueues += bytes;
+        }
+        
+        if (attrMask & MSG_MASK_PERSIST){
+            msgPersistEnqueues++;
+            bytePersistEnqueues += bytes;
+        }
+
+        msgDepth++;
+        byteDepth += bytes;
+        adjustQueueHiLo ();
+    }
+    
+    inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
+        msgTotalDequeues++;
+        byteTotalDequeues += bytes;
+
+        if (attrMask & MSG_MASK_TX){
+            msgTxDequeues++;
+            byteTxDequeues += bytes;
+        }
+        
+        if (attrMask & MSG_MASK_PERSIST){
+            msgPersistDequeues++;
+            bytePersistDequeues += bytes;
+        }
+
+        msgDepth--;
+        byteDepth -= bytes;
+        adjustQueueHiLo ();
+    }
+    
+    inline void incConsumers (void){
+        consumers++;
+        adjustConsumerHiLo ();
+    }
+    
+    inline void decConsumers (void){
+        consumers--;
+        adjustConsumerHiLo ();
+    }
+};
+
+}}
+            
+
+
+#endif  /*!_ManagementQueue_*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp Mon Nov 12 
16:34:09 2007
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+#include "Manageable.h"
+#include "Vhost.h"
+
+using namespace qpid::management;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool Vhost::schemaNeeded = true;
+
+Vhost::Vhost (Manageable* _core, Manageable* _parent) :
+    ManagementObject (_core), name("/")
+{
+    brokerRef = _parent->GetManagementObject ()->getObjectId ();
+}
+
+Vhost::~Vhost () {}
+
+void Vhost::writeSchema (Buffer& buf)
+{
+    schemaNeeded = false;
+
+    schemaListBegin (buf);
+    schemaItem (buf, TYPE_UINT64, "brokerRef", "Broker Reference" ,    true);
+    schemaItem (buf, TYPE_STRING, "name",      "Name of virtual host", true);
+    schemaListEnd (buf);
+}
+
+void Vhost::writeConfig (Buffer& buf)
+{
+    configChanged = false;
+
+    writeTimestamps    (buf);
+    buf.putLongLong    (brokerRef);
+    buf.putShortString (name);
+}
+

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h Mon Nov 12 
16:34:09 2007
@@ -0,0 +1,65 @@
+#ifndef _ManagementVhost_
+#define _ManagementVhost_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+#include "ManagementObject.h"
+#include "boost/shared_ptr.hpp"
+
+namespace qpid { 
+namespace management {
+
+class Vhost : public ManagementObject
+{
+  public:
+
+    typedef boost::shared_ptr<Vhost> shared_ptr;
+
+    Vhost  (Manageable* coreObject, Manageable* parentObject);
+    ~Vhost (void);
+
+  private:
+
+    static bool schemaNeeded;
+
+    uint64_t    brokerRef;
+    std::string name;
+
+    uint16_t    getObjectType        (void) { return OBJECT_VHOST; }
+    std::string getObjectName        (void) { return "vhost"; }
+    void        writeSchema          (qpid::framing::Buffer& buf);
+    void        writeConfig          (qpid::framing::Buffer& buf);
+    void        writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
+    bool        getSchemaNeeded      (void) { return schemaNeeded; }
+    void        setSchemaNeeded      (void) { schemaNeeded = true; }
+    void        doMethod             (std::string            /*methodName*/,
+                                      qpid::framing::Buffer& /*inBuf*/,
+                                      qpid::framing::Buffer& /*outBuf*/) {}
+
+    inline bool getInstChanged       (void) { return false; }
+};
+
+}}
+
+
+#endif  /*!_ManagementVhost_*/


Reply via email to