Author: tross
Date: Tue Nov 4 15:01:57 2008
New Revision: 711458
URL: http://svn.apache.org/viewvc?rev=711458&view=rev
Log:
Added bank numbers to the routing key of a QMF heartbeat message.
This is used by the console to identify which agent sent the indication.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=711458&r1=711457&r2=711458&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue
Nov 4 15:01:57 2008
@@ -22,7 +22,6 @@
#include "qpid/management/ManagementObject.h"
#include "ManagementAgentImpl.h"
#include <list>
-#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
@@ -597,11 +596,12 @@
Buffer msgBuffer(msgChars, BUFSIZE);
encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
+ stringstream key;
+ key << "console.heartbeat." << assignedBrokerBank << "." <<
assignedAgentBank;
contentSize = BUFSIZE - msgBuffer.available();
msgBuffer.reset();
- routingKey = "console.heartbeat";
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management",
routingKey);
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management",
key.str());
}
moveNewObjectsLH();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=711458&r1=711457&r2=711458&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Tue
Nov 4 15:01:57 2008
@@ -351,7 +351,7 @@
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.heartbeat";
+ routingKey = "console.heartbeat.1.0";
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=711458&r1=711457&r2=711458&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Tue Nov 4 15:01:57 2008
@@ -489,10 +489,21 @@
if self.console:
self.console.methodResponse(broker, seq, result)
- def _handleHeartbeatInd(self, broker, codec, seq):
+ def _handleHeartbeatInd(self, broker, codec, seq, msg):
+ brokerBank = 1
+ agentBank = 0
+ dp = msg.get("delivery_properties")
+ if dp:
+ key = dp["routing_key"]
+ keyElements = key.split(".")
+ if len(keyElements) == 4:
+ brokerBank = int(keyElements[2])
+ agentBank = int(keyElements[3])
+
+ agent = broker.getAgent(brokerBank, agentBank)
timestamp = codec.read_uint64()
if self.console != None:
- self.console.heartbeat(None, timestamp)
+ self.console.heartbeat(agent, timestamp)
def _handleEventInd(self, broker, codec, seq):
if self.console != None:
@@ -1086,7 +1097,7 @@
self.authUser = authUser
self.authPass = authPass
self.agents = {}
- self.agents[0] = Agent(self, "1.0", "BrokerAgent")
+ self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
@@ -1112,6 +1123,12 @@
def getBrokerBank(self):
return 1
+ def getAgent(self, brokerBank, agentBank):
+ bankKey = "%d.%d" % (brokerBank, agentBank)
+ if bankKey in self.agents:
+ return self.agents[bankKey]
+ return None
+
def getSessionId(self):
""" Get the identifier of the AMQP session to the broker """
return self.amqpSessionId
@@ -1287,7 +1304,7 @@
elif opcode == 'z': self.session._handleCommandComplete (self, codec,
seq)
elif opcode == 'q': self.session._handleClassInd (self, codec,
seq)
elif opcode == 'm': self.session._handleMethodResp (self, codec,
seq)
- elif opcode == 'h': self.session._handleHeartbeatInd (self, codec,
seq)
+ elif opcode == 'h': self.session._handleHeartbeatInd (self, codec,
seq, msg)
elif opcode == 'e': self.session._handleEventInd (self, codec,
seq)
elif opcode == 's': self.session._handleSchemaResp (self, codec,
seq)
elif opcode == 'c': self.session._handleContentInd (self, codec,
seq, prop=True)