Author: nsantos
Date: Thu May 15 18:53:17 2008
New Revision: 656920
URL: http://svn.apache.org/viewvc?rev=656920&view=rev
Log:
QPID-1065: Management messages may lost if client attach hits a small time
window -- patch supplied by Ted Ross
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=656920&r1=656919&r2=656920&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Thu
May 15 18:53:17 2008
@@ -55,6 +55,7 @@
nextObjectId = 1;
bootSequence = 1;
nextRemoteBank = 10;
+ clientWasAdded = false;
// Get from file or generate and save to file.
if (dataDir.empty ())
@@ -129,16 +130,16 @@
}
void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange,
- broker::Exchange::shared_ptr _dexchange)
+ broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
}
void ManagementBroker::RegisterClass (string packageName,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t
schemaCall)
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t
schemaCall)
{
Mutex::ScopedLock lock (userLock);
PackageMap::iterator pIter = FindOrAddPackage (packageName);
@@ -146,15 +147,22 @@
}
void ManagementBroker::addObject (ManagementObject::shared_ptr object,
- uint32_t persistId,
- uint32_t persistBank)
+ uint32_t persistId,
+ uint32_t persistBank)
{
Mutex::ScopedLock lock (userLock);
uint64_t objectId;
if (persistId == 0)
+ {
objectId = ((uint64_t) bootSequence) << 48 |
((uint64_t) localBank) << 24 | nextObjectId++;
+ if ((nextObjectId & 0xFF000000) != 0)
+ {
+ nextObjectId = 1;
+ localBank++;
+ }
+ }
else
objectId = ((uint64_t) persistBank) << 24 | persistId;
@@ -175,13 +183,9 @@
void ManagementBroker::clientAdded (void)
{
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = iter->second;
- object->setAllChanged ();
- }
+ Mutex::ScopedLock lock (userLock);
+
+ clientWasAdded = true;
}
void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -258,6 +262,18 @@
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
+ if (clientWasAdded)
+ {
+ clientWasAdded = false;
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ object->setAllChanged ();
+ }
+ }
+
if (managementObjects.empty ())
return;
@@ -542,9 +558,6 @@
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-
- clientAdded ();
- // TODO: Send client-added to each remote agent.
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=656920&r1=656919&r2=656920&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Thu
May 15 18:53:17 2008
@@ -155,6 +155,7 @@
uint32_t localBank;
uint32_t nextObjectId;
uint32_t nextRemoteBank;
+ bool clientWasAdded;
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp?rev=656920&r1=656919&r2=656920&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
Thu May 15 18:53:17 2008
@@ -53,6 +53,14 @@
TopicExchange::route (msg, routingKey, args);
}
+bool ManagementExchange::bind (Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args)
+{
+ managementAgent->clientAdded ();
+ return TopicExchange::bind (queue, routingKey, args);
+}
+
void ManagementExchange::setManagmentAgent (ManagementBroker* agent)
{
managementAgent = agent;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h?rev=656920&r1=656919&r2=656920&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h Thu
May 15 18:53:17 2008
@@ -46,6 +46,10 @@
const string& routingKey,
const qpid::framing::FieldTable* args);
+ virtual bool bind (Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
void setManagmentAgent (management::ManagementBroker* agent);
virtual ~ManagementExchange();