Author: tross
Date: Thu Nov 20 06:51:45 2008
New Revision: 719245

URL: http://svn.apache.org/viewvc?rev=719245&view=rev
Log:
QPID-1476 - routing keys used for updates can't be used to discriminate by agent
- Fixed routing keys in agents and binding keys in consoles
- Added some additional debug output for ManagementAgentImpl
- Minor cleanup in the connection close path for ManagementAgentImpl

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
    incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=719245&r1=719244&r2=719245&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu 
Nov 20 06:51:45 2008
@@ -192,6 +192,10 @@
     Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
     uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) 
severity;
+    stringstream key;
+
+    key << "console.event." << assignedBrokerBank << "." << assignedAgentBank 
<< "." <<
+        event.getPackageName() << "." << event.getEventName();
 
     encodeHeader(outBuffer, 'e');
     outBuffer.putShortString(event.getPackageName());
@@ -202,8 +206,7 @@
     event.encode(outBuffer);
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
-    connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management",
-                              "console.event." + event.getPackageName() + "." 
+ event.getEventName());
+    connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str());
 }
 
 uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -684,6 +687,10 @@
 
     moveNewObjectsLH();
 
+    if (debugLevel >= DEBUG_PUBLISH) {
+        cout << "Objects managed: " << managementObjects.size() << endl;
+    }
+
     if (clientWasAdded) {
         clientWasAdded = false;
         for (ManagementObjectMap::iterator iter = managementObjects.begin();
@@ -752,12 +759,16 @@
         if (contentSize > 0) {
             msgBuffer.reset();
             stringstream key;
-            key << "console.obj." << baseObject->getPackageName() << "." << 
baseObject->getClassName() << "." <<
-                assignedBrokerBank << "." << assignedAgentBank;
+            key << "console.obj." << assignedBrokerBank << "." << 
assignedAgentBank << "." <<
+                baseObject->getPackageName() << "." << 
baseObject->getClassName();
             connThreadBody.sendBuffer(msgBuffer, contentSize, 
"qpid.management", key.str());
         }
     }
 
+    if (debugLevel >= DEBUG_PUBLISH && !deleteList.empty()) {
+        cout << "Deleting " << deleteList.size() << " objects" << endl;
+    }
+
     // Delete flagged objects
     for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = 
deleteList.rbegin();
          iter != deleteList.rend();
@@ -798,6 +809,8 @@
                     cout << "    Connection established" << endl;
                 {
                     Mutex::ScopedLock _lock(connLock);
+                    if (shutdown)
+                        return;
                     operational = true;
                     agent.startProtocol();
                     try {
@@ -809,12 +822,12 @@
                         cout << "QMF Agent connection has been lost" << endl;
 
                     operational = false;
+                    agent.connected = false;
                 }
                 delay = delayMin;
+                connection.close();
                 delete subscriptions;
                 subscriptions = 0;
-                session.close();
-                connection.close();
             }
         } catch (exception &e) {
             if (delay < delayMax)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=719245&r1=719244&r2=719245&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Thu 
Nov 20 06:51:45 2008
@@ -237,7 +237,7 @@
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
     sendBuffer(outBuffer, outLen, mExchange,
-               "console.event." + event.getPackageName() + "." + 
event.getEventName());
+               "console.event.1.0." + event.getPackageName() + "." + 
event.getEventName());
 }
 
 ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t 
_seconds)
@@ -357,13 +357,11 @@
 
     moveNewObjectsLH();
 
-    if (clientWasAdded)
-    {
+    if (clientWasAdded) {
         clientWasAdded = false;
         for (ManagementObjectMap::iterator iter = managementObjects.begin ();
              iter != managementObjects.end ();
-             iter++)
-        {
+             iter++) {
             ManagementObject* object = iter->second;
             object->setAllChanged ();
         }
@@ -386,7 +384,7 @@
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
-            routingKey = "console.obj." + object->getPackageName() + "." + 
object->getClassName() + "1.0";
+            routingKey = "console.obj.1.0." + object->getPackageName() + "." + 
object->getClassName();
             sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
         
@@ -398,7 +396,7 @@
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
-            routingKey = "console.obj." + object->getPackageName() + "." + 
object->getClassName() + "1.0";
+            routingKey = "console.obj.1.0." + object->getPackageName() + "." + 
object->getClassName();
             sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
 

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=719245&r1=719244&r2=719245&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Thu Nov 20 06:51:45 2008
@@ -224,7 +224,7 @@
       raise Exception("userBindings option not set for Session")
     for broker in self.brokers:
       broker.amqpSession.exchange_bind(exchange="qpid.management", 
queue=broker.topicName,
-                                       binding_key="console.obj.%s.#" % 
packageName)
+                                       binding_key="console.obj.*.*.%s.#" % 
packageName)
 
   def bindClass(self, classKey):
     """ """
@@ -233,7 +233,7 @@
     pname, cname, hash = classKey
     for broker in self.brokers:
       broker.amqpSession.exchange_bind(exchange="qpid.management", 
queue=broker.topicName,
-                                       binding_key="console.obj.%s.%s.#" % 
(pname, cname))
+                                       binding_key="console.obj.*.*.%s.%s.#" % 
(pname, cname))
 
   def getAgents(self, broker=None):
     """ Get a list of currently known agents """
@@ -370,7 +370,7 @@
       if self.rcvObjects and not self.userBindings:
         keyList.append("console.obj.#")
       else:
-        keyList.append("console.obj.org.apache.qpid.broker.agent")
+        keyList.append("console.obj.*.*.org.apache.qpid.broker.agent")
       if self.rcvEvents:
         keyList.append("console.event.#")
       if self.rcvHeartbeats:

Modified: incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb?rev=719245&r1=719244&r2=719245&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb (original)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb Thu Nov 20 06:51:45 2008
@@ -217,7 +217,7 @@
       @brokers.each do |broker|
         args = { :exchange => "qpid.management",
           :queue => broker.topicName,
-          :binding_key => "console.obj.#{package_name}.#" }
+          :binding_key => "console.obj.*.*.#{package_name}.#" }
         broker.amqpSession.exchange_bind(args)
       end
     end
@@ -230,7 +230,7 @@
       @brokers.each do |broker|
         args = { :exchange => "qpid.management",
           :queue => broker.topicName,
-          :binding_key => "console.obj.#{pname}.#{cname}.#" }
+          :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
         broker.amqpSession.exchange_bind(args)
       end
     end
@@ -637,7 +637,7 @@
         if @rcv_objects && ! @user_bindings
           key_list << "console.obj.#"
         else
-          key_list << "console.obj.org.apache.qpid.broker.agent"
+          key_list << "console.obj.*.*.org.apache.qpid.broker.agent"
         end
         key_list << "console.event.#" if @rcv_events
         key_list << "console.heartbeat.#" if @rcv_heartbeats


Reply via email to