Author: tross
Date: Wed Nov 12 20:15:15 2008
New Revision: 713631

URL: http://svn.apache.org/viewvc?rev=713631&view=rev
Log:
Updated qmf-agent API to allow user to specify uid, password, mechanism, and 
protocol.
Fixed qmf-console bug related to routing keys of object messages.
Pass the binding key into the management agent to allow for selective broadcast
of object data.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
    incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h?rev=713631&r1=713630&r2=713631&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h Wed Nov 12 
20:15:15 2008
@@ -81,24 +81,28 @@
     //   storeFile         - File where this process has read and write 
access.  This
     //                       file shall be used to store persistent state.
     //
-    virtual void init (std::string brokerHost        = "localhost",
-                       uint16_t    brokerPort        = 5672,
-                       uint16_t    intervalSeconds   = 10,
-                       bool        useExternalThread = false,
-                       std::string storeFile         = "") = 0;
+    virtual void init(const std::string& brokerHost = "localhost",
+                      uint16_t brokerPort = 5672,
+                      uint16_t intervalSeconds = 10,
+                      bool useExternalThread = false,
+                      const std::string& storeFile = "",
+                      const std::string& uid = "guest",
+                      const std::string& pwd = "guest",
+                      const std::string& mech = "PLAIN",
+                      const std::string& proto = "tcp") = 0;
 
     // Register a schema with the management agent.  This is normally called 
by the
     // package initializer generated by the management code generator.
     //
     virtual void
-    registerClass(std::string& packageName,
-                  std::string& className,
+    registerClass(const std::string& packageName,
+                  const std::string& className,
                   uint8_t*    md5Sum,
                   management::ManagementObject::writeSchemaCall_t schemaCall) 
= 0;
 
     virtual void
-    registerEvent(std::string& packageName,
-                  std::string& eventName,
+    registerEvent(const std::string& packageName,
+                  const std::string& eventName,
                   uint8_t*    md5Sum,
                   management::ManagementEvent::writeSchemaCall_t schemaCall) = 
0;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=713631&r1=713630&r2=713631&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed 
Nov 12 20:15:15 2008
@@ -80,7 +80,7 @@
 
 ManagementAgentImpl::ManagementAgentImpl() :
     extThread(false), writeFd(-1), readFd(-1),
-    connected(false), lastFailure("never connected"),
+    initialized(false), connected(false), lastFailure("never connected"),
     clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
     assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), 
debugLevel(0),
     connThreadBody(*this), connThread(connThreadBody),
@@ -102,18 +102,26 @@
     }
 }
 
-void ManagementAgentImpl::init(string    brokerHost,
-                               uint16_t  brokerPort,
-                               uint16_t  intervalSeconds,
-                               bool      useExternalThread,
-                               string    _storeFile)
+void ManagementAgentImpl::init(const string& brokerHost,
+                               uint16_t brokerPort,
+                               uint16_t intervalSeconds,
+                               bool useExternalThread,
+                               const string& _storeFile,
+                               const string& uid,
+                               const string& pwd,
+                               const string& mech,
+                               const string& proto)
 {
     interval     = intervalSeconds;
     extThread    = useExternalThread;
     storeFile    = _storeFile;
     nextObjectId = 1;
-    host         = brokerHost;
-    port         = brokerPort;
+    connectionSettings.protocol = proto;
+    connectionSettings.host = brokerHost;
+    connectionSettings.port = brokerPort;
+    connectionSettings.username = uid;
+    connectionSettings.password = pwd;
+    connectionSettings.mechanism = mech;
 
     if (debugLevel)
         cout << "QMF Agent Initialized: broker=" << brokerHost << ":" << 
brokerPort <<
@@ -139,10 +147,12 @@
     if ((bootSequence & 0xF000) != 0)
         bootSequence = 1;
     storeData(true);
+
+    initialized = true;
 }
 
-void ManagementAgentImpl::registerClass(string& packageName,
-                                        string& className,
+void ManagementAgentImpl::registerClass(const string& packageName,
+                                        const string& className,
                                         uint8_t*     md5Sum,
                                         
management::ManagementObject::writeSchemaCall_t schemaCall)
 { 
@@ -151,8 +161,8 @@
     addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, 
schemaCall);
 }
 
-void ManagementAgentImpl::registerEvent(string& packageName,
-                                        string& eventName,
+void ManagementAgentImpl::registerEvent(const string& packageName,
+                                        const string& eventName,
                                         uint8_t*     md5Sum,
                                         
management::ManagementObject::writeSchemaCall_t schemaCall)
 { 
@@ -605,7 +615,6 @@
     Mutex::ScopedLock lock(agentLock);
     char                msgChars[BUFSIZE];
     uint32_t            contentSize;
-    string              routingKey;
     list<pair<ObjectId, ManagementObject*> > deleteList;
 
     if (!connected)
@@ -692,8 +701,10 @@
         contentSize = BUFSIZE - msgBuffer.available();
         if (contentSize > 0) {
             msgBuffer.reset();
-            routingKey = "console.obj." + baseObject->getPackageName() + "." + 
baseObject->getClassName();
-            connThreadBody.sendBuffer(msgBuffer, contentSize, 
"qpid.management", routingKey);
+            stringstream key;
+            key << "console.obj." << baseObject->getPackageName() << "." << 
baseObject->getClassName() << "." <<
+                assignedBrokerBank << "." << assignedAgentBank;
+            connThreadBody.sendBuffer(msgBuffer, contentSize, 
"qpid.management", key.str());
         }
     }
 
@@ -721,10 +732,10 @@
 
     while (true) {
         try {
-            if (!agent.host.empty()) {
+            if (agent.initialized) {
                 if (agent.debugLevel)
                     cout << "QMF Agent attempting to connect to the broker..." 
<< endl;
-                connection.open(agent.host.c_str(), agent.port);
+                connection.open(agent.connectionSettings);
                 session = connection.newSession(queueName.str());
                 subscriptions = new client::SubscriptionManager(session);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=713631&r1=713630&r2=713631&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Wed Nov 
12 20:15:15 2008
@@ -22,6 +22,7 @@
 
 #include "ManagementAgent.h"
 #include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
 #include "qpid/client/SubscriptionManager.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/AsyncSession.h"
@@ -49,19 +50,23 @@
     // Methods from ManagementAgent
     //
     int getMaxThreads() { return 1; }
-    void init(std::string brokerHost        = "localhost",
-              uint16_t    brokerPort        = 5672,
-              uint16_t    intervalSeconds   = 10,
-              bool        useExternalThread = false,
-              std::string storeFile         = "");
+    void init(const std::string& brokerHost = "localhost",
+              uint16_t brokerPort = 5672,
+              uint16_t intervalSeconds = 10,
+              bool useExternalThread = false,
+              const std::string& storeFile = "",
+              const std::string& uid = "guest",
+              const std::string& pwd = "guest",
+              const std::string& mech = "PLAIN",
+              const std::string& proto = "tcp");
     bool isConnected() { return connected; }
     std::string& getLastFailure() { return lastFailure; }
-    void registerClass(std::string& packageName,
-                       std::string& className,
+    void registerClass(const std::string& packageName,
+                       const std::string& className,
                        uint8_t*     md5Sum,
                        management::ManagementObject::writeSchemaCall_t 
schemaCall);
-    void registerEvent(std::string& packageName,
-                       std::string& eventName,
+    void registerEvent(const std::string& packageName,
+                       const std::string& eventName,
                        uint8_t*     md5Sum,
                        management::ManagementObject::writeSchemaCall_t 
schemaCall);
     ObjectId addObject(management::ManagementObject* objectPtr, uint64_t 
persistId = 0);
@@ -130,8 +135,8 @@
     sys::Mutex        agentLock;
     sys::Mutex        addLock;
     framing::Uuid     systemId;
-    std::string       host;
-    uint16_t          port;
+    client::ConnectionSettings connectionSettings;
+    bool              initialized;
     bool              connected;
     std::string       lastFailure;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=713631&r1=713630&r2=713631&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Wed 
Nov 12 20:15:15 2008
@@ -113,7 +113,7 @@
     }
 }
 
-void ManagementBroker::configure(string _dataDir, uint16_t _interval,
+void ManagementBroker::configure(const string& _dataDir, uint16_t _interval,
                                  qpid::broker::Broker* _broker, int _threads)
 {
     dataDir        = _dataDir;
@@ -178,8 +178,8 @@
     dExchange = _dexchange;
 }
 
-void ManagementBroker::registerClass (string&  packageName,
-                                      string&  className,
+void ManagementBroker::registerClass (const string&  packageName,
+                                      const string&  className,
                                       uint8_t* md5Sum,
                                       ManagementObject::writeSchemaCall_t 
schemaCall)
 {
@@ -188,8 +188,8 @@
     addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, 
schemaCall);
 }
 
-void ManagementBroker::registerEvent (string&  packageName,
-                                      string&  eventName,
+void ManagementBroker::registerEvent (const string&  packageName,
+                                      const string&  eventName,
                                       uint8_t* md5Sum,
                                       ManagementObject::writeSchemaCall_t 
schemaCall)
 {
@@ -251,7 +251,7 @@
     broker.periodicProcessing ();
 }
 
-void ManagementBroker::clientAdded (void)
+void ManagementBroker::clientAdded (const std::string& /*routingKey*/)
 {
     Mutex::ScopedLock lock (userLock);
 
@@ -386,7 +386,7 @@
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
-            routingKey = "console.obj." + object->getPackageName() + "." + 
object->getClassName ();
+            routingKey = "console.obj." + object->getPackageName() + "." + 
object->getClassName() + "1.0";
             sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
         
@@ -398,7 +398,7 @@
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
-            routingKey = "console.obj." + object->getPackageName() + "." + 
object->getClassName ();
+            routingKey = "console.obj." + object->getPackageName() + "." + 
object->getClassName() + "1.0";
             sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
 
@@ -721,7 +721,7 @@
     for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
          aIter != remoteAgents.end();
          aIter++)
-        if (aIter->second->objIdBank == bank)
+        if (aIter->second->brokerBank == bank)
             return true;
     return false;
 }
@@ -796,7 +796,8 @@
     assignedBank = assignBankLH(requestedAgentBank);
 
     RemoteAgent* agent = new RemoteAgent;
-    agent->objIdBank  = assignedBank;
+    agent->brokerBank = brokerBank;
+    agent->agentBank  = assignedBank;
     agent->routingKey = replyToKey;
     agent->connectionRef = connectionRef;
     agent->mgmtObject = new _qmf::Agent (this, agent);
@@ -1006,7 +1007,7 @@
 
 void ManagementBroker::addClassLH(uint8_t               kind,
                                   PackageMap::iterator  pIter,
-                                  string&               className,
+                                  const string&         className,
                                   uint8_t*              md5Sum,
                                   ManagementObject::writeSchemaCall_t 
schemaCall)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=713631&r1=713630&r2=713631&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Wed 
Nov 12 20:15:15 2008
@@ -47,30 +47,33 @@
     ManagementBroker ();
     virtual ~ManagementBroker ();
 
-    void configure       (std::string dataDir, uint16_t interval, 
qpid::broker::Broker* broker, int threadPoolSize);
+    void configure       (const std::string& dataDir, uint16_t interval,
+                          qpid::broker::Broker* broker, int threadPoolSize);
     void setInterval     (uint16_t _interval) { interval = _interval; }
     void setExchange     (qpid::broker::Exchange::shared_ptr mgmtExchange,
                           qpid::broker::Exchange::shared_ptr directExchange);
     int  getMaxThreads   () { return threadPoolSize; }
-    void registerClass   (std::string& packageName,
-                          std::string& className,
+    void registerClass   (const std::string& packageName,
+                          const std::string& className,
                           uint8_t*    md5Sum,
                           ManagementObject::writeSchemaCall_t schemaCall);
-    void registerEvent   (std::string& packageName,
-                          std::string& eventName,
+    void registerEvent   (const std::string& packageName,
+                          const std::string& eventName,
                           uint8_t*    md5Sum,
                           ManagementObject::writeSchemaCall_t schemaCall);
     ObjectId addObject   (ManagementObject* object,
                           uint64_t          persistId = 0);
     void raiseEvent(const ManagementEvent& event, severity_t severity = 
SEV_DEFAULT);
-    void clientAdded     ();
+    void clientAdded     (const std::string& routingKey);
     bool dispatchCommand (qpid::broker::Deliverable&       msg,
                           const std::string&         routingKey,
                           const framing::FieldTable* args);
     const framing::Uuid& getUuid() const { return uuid; }
 
     // Stubs for remote management agent calls
-    void init (std::string, uint16_t, uint16_t, bool, std::string) { 
assert(0); }
+    void init (const std::string&, uint16_t, uint16_t, bool,
+               const std::string&, const std::string&, const std::string&,
+               const std::string&, const std::string&) { assert(0); }
     uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
     int getSignalFd () { assert(0); return -1; }
 
@@ -91,7 +94,8 @@
     //
     struct RemoteAgent : public Manageable
     {
-        uint32_t          objIdBank;
+        uint32_t          brokerBank;
+        uint32_t          agentBank;
         std::string       routingKey;
         ObjectId          connectionRef;
         qmf::org::apache::qpid::broker::Agent*    mgmtObject;
@@ -195,7 +199,7 @@
     PackageMap::iterator findOrAddPackageLH(std::string name);
     void addClassLH(uint8_t                      kind,
                     PackageMap::iterator         pIter,
-                    std::string&                 className,
+                    const std::string&           className,
                     uint8_t*                     md5Sum,
                     ManagementObject::writeSchemaCall_t schemaCall);
     void encodePackageIndication (framing::Buffer&     buf,

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp?rev=713631&r1=713630&r2=713631&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp 
Wed Nov 12 20:15:15 2008
@@ -56,8 +56,8 @@
                                const string& routingKey,
                                const qpid::framing::FieldTable* args)
 {
-    managementAgent->clientAdded ();
-    return TopicExchange::bind (queue, routingKey, args);
+    managementAgent->clientAdded(routingKey);
+    return TopicExchange::bind(queue, routingKey, args);
 }
 
 void ManagementExchange::setManagmentAgent (ManagementBroker* agent)

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=713631&r1=713630&r2=713631&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Wed Nov 12 20:15:15 2008
@@ -224,7 +224,7 @@
       raise Exception("userBindings option not set for Session")
     for broker in self.brokers:
       broker.amqpSession.exchange_bind(exchange="qpid.management", 
queue=broker.topicName,
-                                       binding_key="console.obj.%s" % 
packageName)
+                                       binding_key="console.obj.%s.#" % 
packageName)
 
   def bindClass(self, classKey):
     """ """
@@ -233,7 +233,7 @@
     pname, cname, hash = classKey
     for broker in self.brokers:
       broker.amqpSession.exchange_bind(exchange="qpid.management", 
queue=broker.topicName,
-                                       binding_key="console.obj.%s.%s" % 
(pname, cname))
+                                       binding_key="console.obj.%s.%s.#" % 
(pname, cname))
 
   def getAgents(self, broker=None):
     """ Get a list of currently known agents """
@@ -374,7 +374,7 @@
       if self.rcvEvents:
         keyList.append("console.event.#")
       if self.rcvHeartbeats:
-        keyList.append("console.heartbeat")
+        keyList.append("console.heartbeat.#")
     return keyList
 
   def _handleBrokerConnect(self, broker):
@@ -692,13 +692,6 @@
   def __init__(self, name):
     self.name = name
 
-class ClassKey:
-  """ """
-  def __init__(self, package, className, hash):
-    self.package = package
-    self.className = className
-    self.hash = hash
-
 class SchemaClass:
   """ """
   CLASS_KIND_TABLE = 1

Modified: incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb?rev=713631&r1=713630&r2=713631&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb (original)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb Wed Nov 12 20:15:15 2008
@@ -217,7 +217,7 @@
       @brokers.each do |broker|
         args = { :exchange => "qpid.management",
           :queue => broker.topicName,
-          :binding_key => "console.obj.#{package_name}" }
+          :binding_key => "console.obj.#{package_name}.#" }
         broker.amqpSession.exchange_bind(args)
       end
     end
@@ -230,7 +230,7 @@
       @brokers.each do |broker|
         args = { :exchange => "qpid.management",
           :queue => broker.topicName,
-          :binding_key => "console.obj.#{pname}.#{cname}" }
+          :binding_key => "console.obj.#{pname}.#{cname}.#" }
         broker.amqpSession.exchange_bind(args)
       end
     end


Reply via email to