Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=691700&r1=691699&r2=691700&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Wed Sep 3 11:01:44 2008 @@ -27,6 +27,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" +#include "qpid/broker/AclModule.h" #include <list> #include <iostream> #include <fstream> @@ -80,8 +81,8 @@ ManagementBroker::ManagementBroker () : threadPoolSize(1), interval(10), broker(0) { - localBank = 5; nextObjectId = 1; + brokerBank = 1; bootSequence = 1; nextRemoteBank = 10; nextRequestSequence = 1; @@ -112,7 +113,7 @@ } } -void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) +void ManagementBroker::configure(string _dataDir, uint16_t _interval, broker::Broker* _broker, int _threads) { dataDir = _dataDir; interval = _interval; @@ -140,7 +141,10 @@ inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. bootSequence++; + if (bootSequence & 0xF000) + bootSequence = 1; writeData(); } else @@ -183,29 +187,26 @@ AddClass(pIter, className, md5Sum, schemaCall); } -uint64_t ManagementBroker::addObject (ManagementObject* object, - uint32_t persistId, - uint32_t persistBank) +ObjectId ManagementBroker::addObject (ManagementObject* object, + uint64_t persistId) { Mutex::ScopedLock lock (addLock); - uint64_t objectId; + uint16_t sequence; + uint64_t objectNum; - if (persistId == 0) - { - objectId = ((uint64_t) bootSequence) << 48 | - ((uint64_t) localBank) << 24 | nextObjectId++; - if ((nextObjectId & 0xFF000000) != 0) - { - nextObjectId = 1; - localBank++; - } + if (persistId == 0) { + sequence = bootSequence; + objectNum = nextObjectId++; + } else { + sequence = 0; + objectNum = persistId; } - else - objectId = ((uint64_t) persistBank) << 24 | persistId; - object->setObjectId (objectId); - newManagementObjects[objectId] = object; - return objectId; + ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); + + object->setObjectId(objId); + newManagementObjects[objId] = object; + return objId; } ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) @@ -308,7 +309,7 @@ char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; - std::list<uint64_t> deleteList; + std::list<ObjectId> deleteList; { Buffer msgBuffer(msgChars, BUFSIZE); @@ -373,7 +374,7 @@ } // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin (); iter != deleteList.rend (); iter++) managementObjects.erase (*iter); @@ -408,48 +409,72 @@ // Parse the routing key. This management broker should act as though it // is bound to the exchange to match the following keys: // - // agent.<X>.# - // broker.# - // - // where <X> is any non-negative decimal integer less than the lowest remote - // object-id bank. + // agent.0.# + // broker if (routingKey == "broker") { - dispatchAgentCommandLH (msg); + dispatchAgentCommandLH(msg); + return false; + } + + else if (routingKey.compare(0, 7, "agent.0") == 0) { + dispatchAgentCommandLH(msg); return false; } else if (routingKey.compare(0, 6, "agent.") == 0) { - std::string::size_type delim = routingKey.find('.', 6); - if (delim == string::npos) - delim = routingKey.length(); - string bank = routingKey.substr(6, delim - 6); - if ((uint32_t) atoi(bank.c_str()) <= localBank) { - dispatchAgentCommandLH (msg); - return false; - } + return authorizeAgentMessageLH(msg); } return true; } -void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, + uint32_t sequence, const ConnectionToken* connToken) { string methodName; + string packageName; + string className; + uint8_t hash[16]; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + AclModule* acl = broker->getAcl(); - uint64_t objId = inBuffer.getLongLong(); + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + if (acl != 0) { + string userId = ((const broker::ConnectionState*) connToken)->getUserId(); + std::map<acl::Property, string> params; + params[acl::SCHEMAPACKAGE] = packageName; + params[acl::SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); + return; + } + } + ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); } else { - iter->second->doMethod(methodName, inBuffer, outBuffer); + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + } + else + iter->second->doMethod(methodName, inBuffer, outBuffer); } outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -497,34 +522,33 @@ FindOrAddPackageLH(packageName); } -void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { std::string packageName; - inBuffer.getShortString (packageName); - PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) + inBuffer.getShortString(packageName); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin (); - cIter != cMap.end (); + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); cIter++) { - if (cIter->second->hasSchema ()) + if (cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q', sequence); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + EncodeHeader(outBuffer, 'q', sequence); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } } } - - sendCommandComplete (replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) @@ -551,9 +575,7 @@ outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); - SchemaClass* newSchema = new SchemaClass; - newSchema->pendingSequence = sequence; - pIter->second[key] = newSchema; + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence))); } } @@ -569,7 +591,7 @@ buf.putRawData(buffer, bufferLen); } -void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -578,33 +600,33 @@ inBuffer.getShortString (key.name); inBuffer.getBin128 (key.hash); - PackageMap::iterator pIter = packages.find (packageName); + PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap cMap = pIter->second; - ClassMap::iterator cIter = cMap.find (key); + ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - SchemaClass* classInfo = cIter->second; + SchemaClass& classInfo = cIter->second; - if (classInfo->hasSchema()) { + if (classInfo.hasSchema()) { EncodeHeader(outBuffer, 's', sequence); - classInfo->appendSchema (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + classInfo.appendSchema(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } else - sendCommandComplete (replyToKey, sequence, 1, "Schema not available"); + sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); } else - sendCommandComplete (replyToKey, sequence, 1, "Class key not found"); + sendCommandComplete(replyToKey, sequence, 1, "Class key not found"); } else - sendCommandComplete (replyToKey, sequence, 1, "Package not found"); + sendCommandComplete(replyToKey, sequence, 1, "Package not found"); } -void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -619,24 +641,26 @@ if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { + if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { size_t length = ValidateSchema(inBuffer); - if (length == 0) + if (length == 0) { + QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); + } else { - cIter->second->buffer = (uint8_t*) malloc(length); - cIter->second->bufferLen = length; - inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); + cIter->second.buffer = (uint8_t*) malloc(length); + cIter->second.bufferLen = length; + inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); // Publish a class-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q'); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + EncodeHeader(outBuffer, 'q'); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); } } } @@ -671,14 +695,14 @@ void ManagementBroker::deleteOrphanedAgentsLH() { - vector<uint64_t> deleteList; + vector<ObjectId> deleteList; for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { - uint64_t connectionRef = aIter->first; + ObjectId connectionRef = aIter->first; bool found = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { if (iter->first == connectionRef && !iter->second->isDeleted()) { found = true; @@ -692,10 +716,8 @@ } } - for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) { - + for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) remoteAgents.erase(*dIter); - } deleteList.clear(); } @@ -705,7 +727,7 @@ string label; uint32_t requestedBank; uint32_t assignedBank; - uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; moveNewObjectsLH(); @@ -741,6 +763,7 @@ uint32_t outLen; EncodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); @@ -786,13 +809,77 @@ sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::dispatchAgentCommandLH (Message& msg) +bool ManagementBroker::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; uint32_t sequence; string replyToKey; + if (msg.encodedSize() > MA_BUFFER_SIZE) + return false; + + msg.encodeContent(inBuffer); + inBuffer.reset(); + + if (!CheckHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + // TODO: check method call against ACL list. + AclModule* acl = broker->getAcl(); + if (acl == 0) + return true; + + string userId = ((const broker::ConnectionState*) msg.getPublisher())->getUserId(); + string packageName; + string className; + uint8_t hash[16]; + string methodName; + + std::map<acl::Property, string> params; + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + params[acl::SCHEMAPACKAGE] = packageName; + params[acl::SCHEMACLASS] = className; + + if (acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) + return true; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + + return false; + } + + return true; +} + +void ManagementBroker::dispatchAgentCommandLH(Message& msg) +{ + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { @@ -823,7 +910,7 @@ else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) @@ -834,7 +921,7 @@ // No such package found, create a new map entry. pair<PackageMap::iterator, bool> result = - packages.insert (pair<string, ClassMap> (name, ClassMap ())); + packages.insert(pair<string, ClassMap>(name, ClassMap())); QPID_LOG (debug, "ManagementBroker added package " << name); // Publish a package-indication message @@ -859,20 +946,18 @@ ClassMap& cMap = pIter->second; key.name = className; - memcpy (&key.hash, md5Sum, 16); + memcpy(&key.hash, md5Sum, 16); - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) return; // No such class found, create a new class with local information. QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - SchemaClass* classInfo = new SchemaClass; - classInfo->writeSchemaCall = schemaCall; - cMap[key] = classInfo; - cIter = cMap.find (key); + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall))); + cIter = cMap.find(key); } void ManagementBroker::EncodePackageIndication (Buffer& buf, @@ -917,6 +1002,8 @@ for (uint16_t idx = 0; idx < methCount; idx++) { FieldTable ft; ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; int argCount = ft.getInt("argCount"); for (int mIdx = 0; mIdx < argCount; mIdx++) { FieldTable aft; @@ -924,10 +1011,41 @@ } } - if (evntCount != 0) - return 0; + for (uint16_t idx = 0; idx < evntCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } end = inBuffer.getPosition(); inBuffer.restore(); // restore original position return end - start; } + +Mutex& ManagementBroker::getMutex() +{ + return userLock; +} + +Buffer* ManagementBroker::startEventLH() +{ + Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); + EncodeHeader(*outBuffer, 'e'); + outBuffer->putLongLong(uint64_t(Duration(now()))); + return outBuffer; +} + +void ManagementBroker::finishEventLH(Buffer* outBuffer) +{ + uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); + outBuffer->reset(); + SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event"); + delete outBuffer; +} +
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=691700&r1=691699&r2=691700&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Wed Sep 3 11:01:44 2008 @@ -47,7 +47,7 @@ ManagementBroker (); virtual ~ManagementBroker (); - void configure (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); + void configure (std::string dataDir, uint16_t interval, broker::Broker* broker, int threadPoolSize); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, broker::Exchange::shared_ptr directExchange); @@ -56,16 +56,15 @@ std::string className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); - uint64_t addObject (ManagementObject* object, - uint32_t persistId = 0, - uint32_t persistBank = 4); + ObjectId addObject (ManagementObject* object, + uint64_t persistId = 0); void clientAdded (void); bool dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); // Stubs for remote management agent calls - void init (std::string, uint16_t, uint16_t, bool) { assert(0); } + void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); } uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } int getSignalFd () { assert(0); return -1; } @@ -88,7 +87,7 @@ { uint32_t objIdBank; std::string routingKey; - uint64_t connectionRef; + ObjectId connectionRef; Agent* mgmtObject; ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); @@ -97,7 +96,7 @@ // TODO: Eventually replace string with entire reply-to structure. reply-to // currently assumes that the exchange is "amq.direct" even though it could // in theory be specified differently. - typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap; + typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap; typedef std::vector<std::string> ReplyToVector; // Storage for known schema classes: @@ -133,12 +132,15 @@ size_t bufferLen; uint8_t* buffer; - SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {} + SchemaClass(uint32_t seq) : + writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} + SchemaClass(ManagementObject::writeSchemaCall_t call) : + writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } void appendSchema (framing::Buffer& buf); }; - typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap; + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; typedef std::map<std::string, ClassMap> PackageMap; RemoteAgentMap remoteAgents; @@ -157,10 +159,10 @@ broker::Exchange::shared_ptr dExchange; std::string dataDir; uint16_t interval; - Manageable* broker; + broker::Broker* broker; uint16_t bootSequence; - uint32_t localBank; uint32_t nextObjectId; + uint32_t brokerBank; uint32_t nextRemoteBank; uint32_t nextRequestSequence; bool clientWasAdded; @@ -168,6 +170,7 @@ # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; void writeData (); void PeriodicProcessing (void); @@ -179,7 +182,8 @@ std::string routingKey); void moveNewObjectsLH(); - void dispatchAgentCommandLH (broker::Message& msg); + bool authorizeAgentMessageLH(broker::Message& msg); + void dispatchAgentCommandLH(broker::Message& msg); PackageMap::iterator FindOrAddPackageLH(std::string name); void AddClass(PackageMap::iterator pIter, @@ -206,9 +210,12 @@ void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken); size_t ValidateSchema(framing::Buffer&); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* outBuffer); }; }} Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=691700&r1=691699&r2=691700&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Wed Sep 3 11:01:44 2008 @@ -28,6 +28,62 @@ using namespace qpid::management; using namespace qpid::sys; +void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) +{ + first = + ((uint64_t) (broker & 0x000fffff)) << 28 | + ((uint64_t) (bank & 0x0fffffff)); +} + +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object) + : agent(0) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28 | + ((uint64_t) (bank & 0x0fffffff)); + second = object; +} + +ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object) + : agent(_agent) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48; + second = object; +} + +bool ObjectId::operator==(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return first == otherFirst && second == other.second; +} + +bool ObjectId::operator<(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return (first < otherFirst) || ((first == otherFirst) && (second < other.second)); +} + +void ObjectId::encode(framing::Buffer& buffer) +{ + if (agent == 0) + buffer.putLongLong(first); + else + buffer.putLongLong(first | agent->first); + buffer.putLongLong(second); +} + +void ObjectId::decode(framing::Buffer& buffer) +{ + first = buffer.getLongLong(); + second = buffer.getLongLong(); +} + int ManagementObject::nextThreadIndex = 0; void ManagementObject::writeTimestamps (Buffer& buf) @@ -38,10 +94,10 @@ buf.putLongLong (uint64_t (Duration (now ()))); buf.putLongLong (createTime); buf.putLongLong (destroyTime); - buf.putLongLong (objectId); + objectId.encode(buf); } -void ManagementObject::setReference(uint64_t) {} +void ManagementObject::setReference(ObjectId) {} int ManagementObject::getThreadIndex() { static __thread int thisIndex = -1; @@ -54,3 +110,17 @@ return thisIndex; } +Mutex& ManagementObject::getMutex() +{ + return agent->getMutex(); +} + +Buffer* ManagementObject::startEventLH() +{ + return agent->startEventLH(); +} + +void ManagementObject::finishEventLH(Buffer* buf) +{ + agent->finishEventLH(buf); +} Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=691700&r1=691699&r2=691700&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Wed Sep 3 11:01:44 2008 @@ -32,6 +32,34 @@ class Manageable; class ManagementAgent; +class ObjectId; + + +class AgentAttachment { + friend class ObjectId; +private: + uint64_t first; +public: + AgentAttachment() : first(0) {} + void setBanks(uint32_t broker, uint32_t bank); +}; + + +class ObjectId { +private: + const AgentAttachment* agent; + uint64_t first; + uint64_t second; +public: + ObjectId() : agent(0), first(0), second(0) {} + ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); } + ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object); + ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object); + bool operator==(const ObjectId &other) const; + bool operator<(const ObjectId &other) const; + void encode(framing::Buffer& buffer); + void decode(framing::Buffer& buffer); +}; class ManagementObject { @@ -39,7 +67,7 @@ uint64_t createTime; uint64_t destroyTime; - uint64_t objectId; + ObjectId objectId; bool configChanged; bool instChanged; bool deleted; @@ -84,11 +112,15 @@ int getThreadIndex(); void writeTimestamps (qpid::framing::Buffer& buf); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* buf); + public: typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); ManagementObject (ManagementAgent* _agent, Manageable* _core) : - destroyTime(0), objectId (0), configChanged(true), + destroyTime(0), configChanged(true), instChanged(true), deleted(false), coreObject(_core), agent(_agent) { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } virtual ~ManagementObject () {} @@ -100,14 +132,14 @@ virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; - virtual void setReference (uint64_t objectId); + virtual void setReference (ObjectId objectId); virtual std::string& getClassName (void) = 0; virtual std::string& getPackageName (void) = 0; virtual uint8_t* getMd5Sum (void) = 0; - void setObjectId (uint64_t oid) { objectId = oid; } - uint64_t getObjectId (void) { return objectId; } + void setObjectId (ObjectId oid) { objectId = oid; } + ObjectId getObjectId (void) { return objectId; } inline bool getConfigChanged (void) { return configChanged; } virtual bool getInstChanged (void) { return instChanged; } inline void setAllChanged (void) { @@ -120,10 +152,9 @@ deleted = true; } inline bool isDeleted (void) { return deleted; } - inline sys::Mutex& getLock() { return accessLock; } }; -typedef std::map<uint64_t,ManagementObject*> ManagementObjectMap; +typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap; }} Modified: incubator/qpid/trunk/qpid/python/qpid/management.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=691700&r1=691699&r2=691700&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/management.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/management.py Wed Sep 3 11:01:44 2008 @@ -69,6 +69,53 @@ for cell in row: setattr (self, cell[0], cell[1]) +class objectId(object): + """ Object that represents QMF object identifiers """ + + def __init__(self, codec): + self.first = codec.read_uint64() + self.second = codec.read_uint64() + + def __cmp__(self, other): + if other == None: + return 1 + if self.first < other.first: + return -1 + if self.first > other.first: + return 1 + if self.second < other.second: + return -1 + if self.second > other.second: + return 1 + return 0 + + + def index(self): + return (self.first, self.second) + + def getFlags(self): + return (self.first & 0xF000000000000000) >> 60 + + def getSequence(self): + return (self.first & 0x0FFF000000000000) >> 48 + + def getBroker(self): + return (self.first & 0x0000FFFFF0000000) >> 28 + + def getBank(self): + return self.first & 0x000000000FFFFFFF + + def getObject(self): + return self.second + + def isDurable(self): + return self.getSequence() == 0 + + def encode(self, codec): + codec.write_uint64(self.first) + codec.write_uint64(self.second) + + class methodResult: """ Object that contains the result of a method call """ @@ -308,6 +355,8 @@ self.handleClassInd (ch, codec) elif hdr[0] == 'h': self.handleHeartbeat (ch, codec) + elif hdr[0] == 'e': + self.handleEvent (ch, codec) else: self.parse (ch, codec, hdr[0], hdr[1]) ch.accept(msg) @@ -386,7 +435,7 @@ elif typecode == 9: # DELTATIME codec.write_uint64 (long (value)) elif typecode == 10: # REF - codec.write_uint64 (long (value)) + value.encode(codec) elif typecode == 11: # BOOL codec.write_uint8 (int (value)) elif typecode == 12: # FLOAT @@ -429,7 +478,7 @@ elif typecode == 9: # DELTATIME data = codec.read_uint64 () elif typecode == 10: # REF - data = codec.read_uint64 () + data = objectId(codec) elif typecode == 11: # BOOL data = codec.read_uint8 () elif typecode == 12: # FLOAT @@ -551,9 +600,9 @@ ch.send ("qpid.management", smsg) def handleClassInd (self, ch, codec): - pname = str (codec.read_str8 ()) - cname = str (codec.read_str8 ()) - hash = codec.read_bin128 () + pname = str (codec.read_str8()) + cname = str (codec.read_str8()) + hash = codec.read_bin128() if pname not in self.packages: return @@ -574,6 +623,32 @@ if self.ctrlCb != None: self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp) + def handleEvent (self, ch, codec): + if self.eventCb == None: + return + timestamp = codec.read_uint64() + objId = objectId(codec) + packageName = str(codec.read_str8()) + className = str(codec.read_str8()) + hash = codec.read_bin128() + name = str(codec.read_str8()) + classKey = (packageName, className, hash) + if classKey not in self.schema: + return; + schemaClass = self.schema[classKey] + row = [] + es = schemaClass['E'] + arglist = None + for ename in es: + (edesc, eargs) = es[ename] + if ename == name: + arglist = eargs + if arglist == None: + return + for arg in arglist: + row.append((arg[0], self.decodeValue(codec, arg[1]))) + self.eventCb(ch.context, classKey, objId, name, row) + def parseSchema (self, ch, codec): """ Parse a received schema-description message. """ self.decOutstanding (ch) @@ -597,22 +672,23 @@ configs = [] insts = [] methods = {} - events = [] + events = {} configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) insts.append (("id", 4, None, None)) for idx in range (configCount): ft = codec.read_map () - name = str (ft["name"]) - type = ft["type"] - access = ft["access"] - index = ft["index"] - unit = None - min = None - max = None - maxlen = None - desc = None + name = str (ft["name"]) + type = ft["type"] + access = ft["access"] + index = ft["index"] + optional = ft["optional"] + unit = None + min = None + max = None + maxlen = None + desc = None for key, value in ft.items (): if key == "unit": @@ -626,7 +702,7 @@ elif key == "desc": desc = str (value) - config = (name, type, unit, desc, access, index, min, max, maxlen) + config = (name, type, unit, desc, access, index, min, max, maxlen, optional) configs.append (config) for idx in range (instCount): @@ -685,6 +761,33 @@ args.append (arg) methods[mname] = (mdesc, args) + for idx in range (eventCount): + ft = codec.read_map () + ename = str (ft["name"]) + argCount = ft["argCount"] + if "desc" in ft: + edesc = str (ft["desc"]) + else: + edesc = None + + args = [] + for aidx in range (argCount): + ft = codec.read_map () + name = str (ft["name"]) + type = ft["type"] + unit = None + desc = None + + for key, value in ft.items (): + if key == "unit": + unit = str (value) + elif key == "desc": + desc = str (value) + + arg = (name, type, unit, desc) + args.append (arg) + events[ename] = (edesc, args) + schemaClass = {} schemaClass['C'] = configs schemaClass['I'] = insts @@ -695,6 +798,22 @@ if self.schemaCb != None: self.schemaCb (ch.context, classKey, configs, insts, methods, events) + def parsePresenceMasks(self, codec, schemaClass): + """ Generate a list of not-present properties """ + excludeList = [] + bit = 0 + for element in schemaClass['C'][1:]: + if element[9] == 1: + if bit == 0: + mask = codec.read_uint8() + bit = 1 + if (mask & bit) == 0: + excludeList.append(element[0]) + bit = bit * 2 + if bit == 256: + bit = 0 + return excludeList + def parseContent (self, ch, cls, codec, seq=0): """ Parse a received content message. """ if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None: @@ -716,21 +835,26 @@ timestamps.append (codec.read_uint64 ()) # Current Time timestamps.append (codec.read_uint64 ()) # Create Time timestamps.append (codec.read_uint64 ()) # Delete Time - + objId = objectId(codec) schemaClass = self.schema[classKey] if cls == 'C' or cls == 'B': - for element in schemaClass['C'][:]: + notPresent = self.parsePresenceMasks(codec, schemaClass) + + if cls == 'C' or cls == 'B': + row.append(("id", objId)) + for element in schemaClass['C'][1:]: tc = element[1] name = element[0] - data = self.decodeValue (codec, tc) - row.append ((name, data)) + if name in notPresent: + row.append((name, None)) + else: + data = self.decodeValue(codec, tc) + row.append((name, data)) if cls == 'I' or cls == 'B': - if cls == 'B': - start = 1 - else: - start = 0 - for element in schemaClass['I'][start:]: + if cls == 'I': + row.append(("id", objId)) + for element in schemaClass['I'][1:]: tc = element[1] name = element[0] data = self.decodeValue (codec, tc) @@ -763,9 +887,12 @@ codec = Codec (self.spec) sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) self.setHeader (codec, ord ('M'), sequence) - codec.write_uint64 (objId) # ID of object + objId.encode(codec) + codec.write_str8 (classId[0]) + codec.write_str8 (classId[1]) + codec.write_bin128 (classId[2]) codec.write_str8 (methodName) - bank = (objId & 0x0000FFFFFF000000) >> 24 + bank = objId.getBank() # Encode args according to schema if classId not in self.schema: Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=691700&r1=691699&r2=691700&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Wed Sep 3 11:01:44 2008 @@ -71,14 +71,14 @@ # def registerObjId (self, objId): - if not objId in self.idBackMap: - self.idBackMap[objId] = self.nextId + if not objId.index() in self.idBackMap: + self.idBackMap[objId.index()] = self.nextId self.idMap[self.nextId] = objId self.nextId += 1 - def displayObjId (self, objId): - if objId in self.idBackMap: - return self.idBackMap[objId] + def displayObjId (self, objIdIndex): + if objIdIndex in self.idBackMap: + return self.idBackMap[objIdIndex] else: return 0 @@ -86,7 +86,7 @@ if displayId in self.idMap: return self.idMap[displayId] else: - return 0 + return None def displayClassName (self, cls): (packageName, className, hash) = cls @@ -102,19 +102,20 @@ self.tables[className] = {} # Register the ID so a more friendly presentation can be displayed - id = long (list[0][1]) - self.registerObjId (id) + objId = list[0][1] + oidx = objId.index() + self.registerObjId (objId) # If this object hasn't been seen before, create a new object record with # the timestamps and empty lists for configuration and instrumentation data. - if id not in self.tables[className]: - self.tables[className][id] = (timestamps, [], []) + if oidx not in self.tables[className]: + self.tables[className][oidx] = (timestamps, [], []) - (unused, oldConf, oldInst) = self.tables[className][id] + (unused, oldConf, oldInst) = self.tables[className][oidx] # For config updates, simply replace old config list with the new one. if context == 0: #config - self.tables[className][id] = (timestamps, list, oldInst) + self.tables[className][oidx] = (timestamps, list, oldInst) # For instrumentation updates, carry the minimum and maximum values for # "hi-lo" stats forward. @@ -132,7 +133,7 @@ if oldInst[idx][1] < value: value = oldInst[idx][1] newInst.append ((key, value)) - self.tables[className][id] = (timestamps, oldConf, newInst) + self.tables[className][oidx] = (timestamps, oldConf, newInst) finally: self.lock.release () @@ -211,11 +212,13 @@ pass def refName (self, oid): - if oid == 0: + if oid == None: return "NULL" - return str (self.displayObjId (oid)) + return str (self.displayObjId (oid.index())) def valueDisplay (self, classKey, key, value): + if value == None: + return "<NULL>" for kind in range (2): schema = self.schema[classKey][kind] for item in schema: @@ -437,7 +440,7 @@ if classKey in self.tables: ids = self.listOfIds(classKey, tokens[1:]) for objId in ids: - (ts, config, inst) = self.tables[classKey][self.rawObjId(objId)] + (ts, config, inst) = self.tables[classKey][self.rawObjId(objId).index()] createTime = self.disp.timestamp (ts[1]) destroyTime = "-" if ts[2] > 0: @@ -486,32 +489,32 @@ rows = [] timestamp = None - config = self.tables[classKey][ids[0]][1] + config = self.tables[classKey][ids[0].index()][1] for eIdx in range (len (config)): key = config[eIdx][0] if key != "id": row = ("property", key) for id in ids: if timestamp == None or \ - timestamp < self.tables[classKey][id][0][0]: - timestamp = self.tables[classKey][id][0][0] - (key, value) = self.tables[classKey][id][1][eIdx] + timestamp < self.tables[classKey][id.index()][0][0]: + timestamp = self.tables[classKey][id.index()][0][0] + (key, value) = self.tables[classKey][id.index()][1][eIdx] row = row + (self.valueDisplay (classKey, key, value),) rows.append (row) - inst = self.tables[classKey][ids[0]][2] + inst = self.tables[classKey][ids[0].index()][2] for eIdx in range (len (inst)): key = inst[eIdx][0] if key != "id": row = ("statistic", key) for id in ids: - (key, value) = self.tables[classKey][id][2][eIdx] + (key, value) = self.tables[classKey][id.index()][2][eIdx] row = row + (self.valueDisplay (classKey, key, value),) rows.append (row) titleRow = ("Type", "Element") for id in ids: - titleRow = titleRow + (self.refName (id),) + titleRow = titleRow + (self.refName(id),) caption = "Object of type %s.%s:" % (classKey[0], classKey[1]) if timestamp != None: caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")" @@ -563,13 +566,15 @@ access = self.accessName (config[4]) extra = "" if config[5] == 1: - extra = extra + "index " + extra += "index " if config[6] != None: - extra = extra + "Min: " + str (config[6]) + extra += "Min: " + str(config[6]) + " " if config[7] != None: - extra = extra + "Max: " + str (config[7]) + extra += "Max: " + str(config[7]) + " " if config[8] != None: - extra = extra + "MaxLen: " + str (config[8]) + extra += "MaxLen: " + str(config[8]) + " " + if config[9] == 1: + extra += "optional " rows.append ((name, typename, unit, access, extra, desc)) for config in self.schema[classKey][1]: @@ -613,7 +618,7 @@ def getClassForId (self, objId): """ Given an object ID, return the class key for the referenced object """ for classKey in self.tables: - if objId in self.tables[classKey]: + if objId.index() in self.tables[classKey]: return classKey return None @@ -659,14 +664,19 @@ def makeIdRow (self, displayId): if displayId in self.idMap: - rawId = self.idMap[displayId] + objId = self.idMap[displayId] else: return None - return (displayId, - rawId, - (rawId & 0x7FFF000000000000) >> 48, - (rawId & 0x0000FFFFFF000000) >> 24, - (rawId & 0x0000000000FFFFFF)) + if objId.getFlags() == 0: + flags = "" + else: + flags = str(objId.getFlags()) + seq = objId.getSequence() + if seq == 0: + seqText = "<durable>" + else: + seqText = str(seq) + return (displayId, flags, seqText, objId.getBroker(), objId.getBank(), hex(objId.getObject())) def listIds (self, select): rows = [] @@ -683,7 +693,7 @@ return rows.append(row) self.disp.table("Translation of Display IDs:", - ("DisplayID", "RawID", "BootSequence", "Bank", "Object"), + ("DisplayID", "Flags", "BootSequence", "Broker", "Bank", "Object"), rows) def do_list (self, data): Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=691700&r1=691699&r2=691700&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/specs/management-schema.xml (original) +++ incubator/qpid/trunk/qpid/specs/management-schema.xml Wed Sep 3 11:01:44 2008 @@ -61,18 +61,16 @@ =============================================================== --> <class name="Broker"> - <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/> - <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/> - <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/> - <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/> - <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/> - <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> - <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/> - <property name="clusterName" type="sstr" access="RO" - desc="Name of cluster this server is a member of"/> - <property name="version" type="sstr" access="RO" desc="Running software version"/> - <property name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/> - <property name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/> + <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/> + <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/> + <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/> + <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/> + <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/> + <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> + <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/> + <property name="clusterName" type="sstr" access="RO" desc="Name of cluster this server is a member of"/> + <property name="version" type="sstr" access="RO" desc="Running software version"/> + <property name="dataDir" type="sstr" access="RO" optional="y" desc="Persistent configuration storage location"/> <method name="joinCluster"> <arg name="clusterName" dir="I" type="sstr"/> @@ -94,6 +92,17 @@ <arg name="username" dir="I" type="sstr"/> <arg name="password" dir="I" type="sstr"/> </method> + + <event name="agentConnect" desc="QMF Management Agent has connected to the broker"> + <arg name="remoteAddress" type="sstr"/> + <arg name="label" type="sstr"/> + <arg name="brokerBank" type="uint32"/> + <arg name="agentBank" type="uint32"/> + </event> + + <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker"> + <arg name="remoteAddress" type="sstr"/> + </event> </class> <!-- Modified: incubator/qpid/trunk/qpid/specs/management-types.xml URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-types.xml?rev=691700&r1=691699&r2=691700&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/specs/management-types.xml (original) +++ incubator/qpid/trunk/qpid/specs/management-types.xml Wed Sep 3 11:01:44 2008 @@ -19,7 +19,7 @@ under the License. --> -<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" accessor="direct" init="0"/> +<type name="objId" base="REF" cpp="ObjectId" encode="#.encode(@)" decode="#.decode(@)" accessor="direct" init="0"/> <type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" accessor="direct" init="0"/> <type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" accessor="direct" init="0"/> <type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong(#)" decode="# = @.getLong()" accessor="direct" init="0"/>
