Author: tross
Date: Fri Nov 21 12:17:22 2008
New Revision: 719699

URL: http://svn.apache.org/viewvc?rev=719699&view=rev
Log:
Fixed several problems related to qmf update timestamps:
  - Timestamps were set at update send time regardless of whether
    the object's contents were actually changed.  Now timestamps are
    set at the time of the change.
  - Agent heartbeat messages are now being sent after periodic
    updates, not before.

Cleaned up the Agent object in qmf.console.

Modified:
    incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
    incubator/qpid/trunk/qpid/python/qmf/console.py

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py Fri Nov 21 
12:17:22 2008
@@ -145,6 +145,7 @@
         stream.write ("            " + prefix + varName + "Max = val;\n")
       if changeFlag != None:
         stream.write ("        " + changeFlag + " = true;\n")
+        stream.write ("        setUpdateTime();\n")
       stream.write ("    }\n")
       if self.style != "mma":
         stream.write ("    inline " + self.asArg + " get_" + varName + "() 
{\n");
@@ -157,6 +158,7 @@
         stream.write ("        presenceMask[presenceByte_%s] &= 
~presenceMask_%s;\n" % (varName, varName))
         if changeFlag != None:
           stream.write ("        " + changeFlag + " = true;\n")
+          stream.write ("        setUpdateTime();\n")
         stream.write ("    }\n")
         stream.write ("    inline bool isSet_" + varName + "() {\n")
         stream.write ("        return (presenceMask[presenceByte_%s] & 
presenceMask_%s) != 0;\n" % (varName, varName))
@@ -171,6 +173,7 @@
         stream.write ("            " + varName + "High = " + varName + ";\n")
       if changeFlag != None:
         stream.write ("        " + changeFlag + " = true;\n")
+        stream.write ("        setUpdateTime();\n")
       stream.write ("    }\n");
       stream.write ("    inline void dec_" + varName + " (" + self.asArg + " 
by = 1) {\n");
       if not self.perThread:
@@ -181,6 +184,7 @@
         stream.write ("            " + varName + "Low = " + varName + ";\n")
       if changeFlag != None:
         stream.write ("        " + changeFlag + " = true;\n")
+        stream.write ("        setUpdateTime();\n")
       stream.write ("    }\n");
 
   def genHiLoStatResets (self, stream, varName):

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri 
Nov 21 12:17:22 2008
@@ -687,18 +687,6 @@
     if (!connected)
         return;
 
-    {
-        Buffer msgBuffer(msgChars, BUFSIZE);
-        encodeHeader(msgBuffer, 'h');
-        msgBuffer.putLongLong(uint64_t(Duration(now())));
-        stringstream key;
-        key << "console.heartbeat." << assignedBrokerBank << "." << 
assignedAgentBank;
-
-        contentSize = BUFSIZE - msgBuffer.available();
-        msgBuffer.reset();
-        connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", 
key.str());
-    }
-
     moveNewObjectsLH();
 
     if (debugLevel >= DEBUG_PUBLISH) {
@@ -715,9 +703,6 @@
         }
     }
 
-    if (managementObjects.empty())
-        return;
-
     //
     //  Clear the been-here flag on all objects in the map.
     //
@@ -792,6 +777,18 @@
     }
 
     deleteList.clear();
+
+    {
+        Buffer msgBuffer(msgChars, BUFSIZE);
+        encodeHeader(msgBuffer, 'h');
+        msgBuffer.putLongLong(uint64_t(Duration(now())));
+        stringstream key;
+        key << "console.heartbeat." << assignedBrokerBank << "." << 
assignedAgentBank;
+
+        contentSize = BUFSIZE - msgBuffer.available();
+        msgBuffer.reset();
+        connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", 
key.str());
+    }
 }
 
 void ManagementAgentImpl::ConnectionThread::run()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Fri 
Nov 21 12:17:22 2008
@@ -344,17 +344,6 @@
     string              routingKey;
     list<pair<ObjectId, ManagementObject*> > deleteList;
 
-    {
-        Buffer msgBuffer(msgChars, BUFSIZE);
-        encodeHeader(msgBuffer, 'h');
-        msgBuffer.putLongLong(uint64_t(Duration(now())));
-
-        contentSize = BUFSIZE - msgBuffer.available ();
-        msgBuffer.reset ();
-        routingKey = "console.heartbeat.1.0";
-        sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
-    }
-
     moveNewObjectsLH();
 
     if (clientWasAdded) {
@@ -367,9 +356,6 @@
         }
     }
 
-    if (managementObjects.empty ())
-        return;
-        
     for (ManagementObjectMap::iterator iter = managementObjects.begin ();
          iter != managementObjects.end ();
          iter++)
@@ -416,6 +402,17 @@
         deleteList.clear();
         deleteOrphanedAgentsLH();
     }
+
+    {
+        Buffer msgBuffer(msgChars, BUFSIZE);
+        encodeHeader(msgBuffer, 'h');
+        msgBuffer.putLongLong(uint64_t(Duration(now())));
+
+        contentSize = BUFSIZE - msgBuffer.available ();
+        msgBuffer.reset ();
+        routingKey = "console.heartbeat.1.0";
+        sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+    }
 }
 
 void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t 
sequence,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Fri 
Nov 21 12:17:22 2008
@@ -163,7 +163,7 @@
     buf.putShortString (getPackageName ());
     buf.putShortString (getClassName ());
     buf.putBin128      (getMd5Sum ());
-    buf.putLongLong    (uint64_t (sys::Duration (sys::now ())));
+    buf.putLongLong    (updateTime);
     buf.putLongLong    (createTime);
     buf.putLongLong    (destroyTime);
     objectId.encode(buf);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Fri 
Nov 21 12:17:22 2008
@@ -114,6 +114,7 @@
     
     uint64_t         createTime;
     uint64_t         destroyTime;
+    uint64_t         updateTime;
     ObjectId         objectId;
     bool             configChanged;
     bool             instChanged;
@@ -132,11 +133,11 @@
   public:
     typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
 
-    ManagementObject (ManagementAgent* _agent, Manageable* _core) :
-        destroyTime(0), configChanged(true),
-        instChanged(true), deleted(false), coreObject(_core), agent(_agent)
-    { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
-    virtual ~ManagementObject () {}
+    ManagementObject(ManagementAgent* _agent, Manageable* _core) :
+        createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))),
+        destroyTime(0), updateTime(createTime), configChanged(true),
+        instChanged(true), deleted(false), coreObject(_core), agent(_agent) {}
+    virtual ~ManagementObject() {}
 
     virtual writeSchemaCall_t getWriteSchemaCall (void) = 0;
     virtual void writeProperties(qpid::framing::Buffer& buf) = 0;
@@ -159,6 +160,7 @@
         configChanged = true;
         instChanged   = true;
     }
+    inline void setUpdateTime() { updateTime = 
(uint64_t(sys::Duration(sys::now()))); }
 
     inline void resourceDestroy  (void) {
         destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ()));

Modified: incubator/qpid/trunk/qpid/python/qmf/console.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qmf/console.py?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qmf/console.py (original)
+++ incubator/qpid/trunk/qpid/python/qmf/console.py Fri Nov 21 12:17:22 2008
@@ -340,7 +340,7 @@
         self.cv.release()
       broker._setHeader(sendCodec, 'G', seq)
       sendCodec.write_map(map)
-      smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
+      smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % 
(agent.brokerBank, agent.agentBank))
       broker._send(smsg)
 
     starttime = time()
@@ -507,7 +507,7 @@
 
     agent = broker.getAgent(brokerBank, agentBank)
     timestamp = codec.read_uint64()
-    if self.console != None:
+    if self.console != None and agent != None:
       self.console.heartbeat(agent, timestamp)
 
   def _handleEventInd(self, broker, codec, seq):
@@ -1135,7 +1135,7 @@
     self.authPass = authPass
     self.topicCredits = topicCredits
     self.agents = {}
-    self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent")
+    self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
     self.topicBound = False
     self.cv = Condition()
     self.syncInFlight = False
@@ -1162,7 +1162,7 @@
     return 1
 
   def getAgent(self, brokerBank, agentBank):
-    bankKey = "%d.%d" % (brokerBank, agentBank)
+    bankKey = (brokerBank, agentBank)
     if bankKey in self.agents:
       return self.agents[bankKey]
     return None
@@ -1250,10 +1250,10 @@
       self.error = "Connect Failed %d - %s" % (e[0], e[1])
 
   def _updateAgent(self, obj):
-    bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank)
+    bankKey = (obj.brokerBank, obj.agentBank)
     if obj._deleteTime == 0:
       if bankKey not in self.agents:
-        agent = Agent(self, bankKey, obj.label)
+        agent = Agent(self, obj.agentBank, obj.label)
         self.agents[bankKey] = agent
         if self.session.console != None:
           self.session.console.newAgent(agent)
@@ -1377,19 +1377,23 @@
 
 class Agent:
   """ """
-  def __init__(self, broker, bank, label):
+  def __init__(self, broker, agentBank, label):
     self.broker = broker
-    self.bank   = bank
-    self.label  = label
+    self.brokerBank = broker.getBrokerBank()
+    self.agentBank = agentBank
+    self.label = label
 
   def __repr__(self):
-    return "Agent at bank %s (%s)" % (self.bank, self.label)
+    return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, 
self.label)
 
   def getBroker(self):
     return self.broker
 
+  def getBrokerBank(self):
+    return self.brokerBank
+
   def getAgentBank(self):
-    return self.bank
+    return self.agentBank
 
 class Event:
   """ """


Reply via email to