Author: tross
Date: Fri Nov 21 12:17:22 2008
New Revision: 719699
URL: http://svn.apache.org/viewvc?rev=719699&view=rev
Log:
Fixed several problems related to qmf update timestamps:
- Timestamps were set at update send time regardless of whether
the object's contents were actually changed. Now timestamps are
set at the time of the change.
- Agent heartbeat messages are now being sent after periodic
updates, not before.
Cleaned up the Agent object in qmf.console.
Modified:
incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py
incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
incubator/qpid/trunk/qpid/python/qmf/console.py
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/qmfgen/schema.py Fri Nov 21
12:17:22 2008
@@ -145,6 +145,7 @@
stream.write (" " + prefix + varName + "Max = val;\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" setUpdateTime();\n")
stream.write (" }\n")
if self.style != "mma":
stream.write (" inline " + self.asArg + " get_" + varName + "()
{\n");
@@ -157,6 +158,7 @@
stream.write (" presenceMask[presenceByte_%s] &=
~presenceMask_%s;\n" % (varName, varName))
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" setUpdateTime();\n")
stream.write (" }\n")
stream.write (" inline bool isSet_" + varName + "() {\n")
stream.write (" return (presenceMask[presenceByte_%s] &
presenceMask_%s) != 0;\n" % (varName, varName))
@@ -171,6 +173,7 @@
stream.write (" " + varName + "High = " + varName + ";\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" setUpdateTime();\n")
stream.write (" }\n");
stream.write (" inline void dec_" + varName + " (" + self.asArg + "
by = 1) {\n");
if not self.perThread:
@@ -181,6 +184,7 @@
stream.write (" " + varName + "Low = " + varName + ";\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" setUpdateTime();\n")
stream.write (" }\n");
def genHiLoStatResets (self, stream, varName):
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri
Nov 21 12:17:22 2008
@@ -687,18 +687,6 @@
if (!connected)
return;
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." <<
assignedAgentBank;
-
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management",
key.str());
- }
-
moveNewObjectsLH();
if (debugLevel >= DEBUG_PUBLISH) {
@@ -715,9 +703,6 @@
}
}
- if (managementObjects.empty())
- return;
-
//
// Clear the been-here flag on all objects in the map.
//
@@ -792,6 +777,18 @@
}
deleteList.clear();
+
+ {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'h');
+ msgBuffer.putLongLong(uint64_t(Duration(now())));
+ stringstream key;
+ key << "console.heartbeat." << assignedBrokerBank << "." <<
assignedAgentBank;
+
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management",
key.str());
+ }
}
void ManagementAgentImpl::ConnectionThread::run()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Fri
Nov 21 12:17:22 2008
@@ -344,17 +344,6 @@
string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "console.heartbeat.1.0";
- sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
moveNewObjectsLH();
if (clientWasAdded) {
@@ -367,9 +356,6 @@
}
}
- if (managementObjects.empty ())
- return;
-
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
iter++)
@@ -416,6 +402,17 @@
deleteList.clear();
deleteOrphanedAgentsLH();
}
+
+ {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'h');
+ msgBuffer.putLongLong(uint64_t(Duration(now())));
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "console.heartbeat.1.0";
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ }
}
void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t
sequence,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Fri
Nov 21 12:17:22 2008
@@ -163,7 +163,7 @@
buf.putShortString (getPackageName ());
buf.putShortString (getClassName ());
buf.putBin128 (getMd5Sum ());
- buf.putLongLong (uint64_t (sys::Duration (sys::now ())));
+ buf.putLongLong (updateTime);
buf.putLongLong (createTime);
buf.putLongLong (destroyTime);
objectId.encode(buf);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Fri
Nov 21 12:17:22 2008
@@ -114,6 +114,7 @@
uint64_t createTime;
uint64_t destroyTime;
+ uint64_t updateTime;
ObjectId objectId;
bool configChanged;
bool instChanged;
@@ -132,11 +133,11 @@
public:
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
- ManagementObject (ManagementAgent* _agent, Manageable* _core) :
- destroyTime(0), configChanged(true),
- instChanged(true), deleted(false), coreObject(_core), agent(_agent)
- { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
- virtual ~ManagementObject () {}
+ ManagementObject(ManagementAgent* _agent, Manageable* _core) :
+ createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))),
+ destroyTime(0), updateTime(createTime), configChanged(true),
+ instChanged(true), deleted(false), coreObject(_core), agent(_agent) {}
+ virtual ~ManagementObject() {}
virtual writeSchemaCall_t getWriteSchemaCall (void) = 0;
virtual void writeProperties(qpid::framing::Buffer& buf) = 0;
@@ -159,6 +160,7 @@
configChanged = true;
instChanged = true;
}
+ inline void setUpdateTime() { updateTime =
(uint64_t(sys::Duration(sys::now()))); }
inline void resourceDestroy (void) {
destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
Modified: incubator/qpid/trunk/qpid/python/qmf/console.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qmf/console.py?rev=719699&r1=719698&r2=719699&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qmf/console.py (original)
+++ incubator/qpid/trunk/qpid/python/qmf/console.py Fri Nov 21 12:17:22 2008
@@ -340,7 +340,7 @@
self.cv.release()
broker._setHeader(sendCodec, 'G', seq)
sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
+ smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
(agent.brokerBank, agent.agentBank))
broker._send(smsg)
starttime = time()
@@ -507,7 +507,7 @@
agent = broker.getAgent(brokerBank, agentBank)
timestamp = codec.read_uint64()
- if self.console != None:
+ if self.console != None and agent != None:
self.console.heartbeat(agent, timestamp)
def _handleEventInd(self, broker, codec, seq):
@@ -1135,7 +1135,7 @@
self.authPass = authPass
self.topicCredits = topicCredits
self.agents = {}
- self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent")
+ self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
@@ -1162,7 +1162,7 @@
return 1
def getAgent(self, brokerBank, agentBank):
- bankKey = "%d.%d" % (brokerBank, agentBank)
+ bankKey = (brokerBank, agentBank)
if bankKey in self.agents:
return self.agents[bankKey]
return None
@@ -1250,10 +1250,10 @@
self.error = "Connect Failed %d - %s" % (e[0], e[1])
def _updateAgent(self, obj):
- bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank)
+ bankKey = (obj.brokerBank, obj.agentBank)
if obj._deleteTime == 0:
if bankKey not in self.agents:
- agent = Agent(self, bankKey, obj.label)
+ agent = Agent(self, obj.agentBank, obj.label)
self.agents[bankKey] = agent
if self.session.console != None:
self.session.console.newAgent(agent)
@@ -1377,19 +1377,23 @@
class Agent:
""" """
- def __init__(self, broker, bank, label):
+ def __init__(self, broker, agentBank, label):
self.broker = broker
- self.bank = bank
- self.label = label
+ self.brokerBank = broker.getBrokerBank()
+ self.agentBank = agentBank
+ self.label = label
def __repr__(self):
- return "Agent at bank %s (%s)" % (self.bank, self.label)
+ return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank,
self.label)
def getBroker(self):
return self.broker
+ def getBrokerBank(self):
+ return self.brokerBank
+
def getAgentBank(self):
- return self.bank
+ return self.agentBank
class Event:
""" """