Author: nsantos
Date: Thu May 15 18:53:17 2008
New Revision: 656920

URL: http://svn.apache.org/viewvc?rev=656920&view=rev
Log:
QPID-1065: Management messages may lost if client attach hits a small time 
window -- patch supplied by Ted Ross

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=656920&r1=656919&r2=656920&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Thu 
May 15 18:53:17 2008
@@ -55,6 +55,7 @@
     nextObjectId   = 1;
     bootSequence   = 1;
     nextRemoteBank = 10;
+    clientWasAdded = false;
 
     // Get from file or generate and save to file.
     if (dataDir.empty ())
@@ -129,16 +130,16 @@
 }
 
 void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange,
-                                   broker::Exchange::shared_ptr _dexchange)
+                                    broker::Exchange::shared_ptr _dexchange)
 {
     mExchange = _mexchange;
     dExchange = _dexchange;
 }
 
 void ManagementBroker::RegisterClass (string   packageName,
-                                     string   className,
-                                     uint8_t* md5Sum,
-                                     ManagementObject::writeSchemaCall_t 
schemaCall)
+                                      string   className,
+                                      uint8_t* md5Sum,
+                                      ManagementObject::writeSchemaCall_t 
schemaCall)
 {
     Mutex::ScopedLock lock (userLock);
     PackageMap::iterator pIter = FindOrAddPackage (packageName);
@@ -146,15 +147,22 @@
 }
 
 void ManagementBroker::addObject (ManagementObject::shared_ptr object,
-                                 uint32_t                     persistId,
-                                 uint32_t                     persistBank)
+                                  uint32_t                     persistId,
+                                  uint32_t                     persistBank)
 {
     Mutex::ScopedLock lock (userLock);
     uint64_t objectId;
 
     if (persistId == 0)
+    {
         objectId = ((uint64_t) bootSequence) << 48 |
             ((uint64_t) localBank) << 24 | nextObjectId++;
+        if ((nextObjectId & 0xFF000000) != 0)
+        {
+            nextObjectId = 1;
+            localBank++;
+        }
+    }
     else
         objectId = ((uint64_t) persistBank) << 24 | persistId;
 
@@ -175,13 +183,9 @@
 
 void ManagementBroker::clientAdded (void)
 {
-    for (ManagementObjectMap::iterator iter = managementObjects.begin ();
-         iter != managementObjects.end ();
-         iter++)
-    {
-        ManagementObject::shared_ptr object = iter->second;
-        object->setAllChanged ();
-    }
+    Mutex::ScopedLock lock (userLock);
+
+    clientWasAdded = true;
 }
 
 void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -258,6 +262,18 @@
         SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
     }
 
+    if (clientWasAdded)
+    {
+        clientWasAdded = false;
+        for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+             iter != managementObjects.end ();
+             iter++)
+        {
+            ManagementObject::shared_ptr object = iter->second;
+            object->setAllChanged ();
+        }
+    }
+
     if (managementObjects.empty ())
         return;
         
@@ -542,9 +558,6 @@
                 outBuffer.reset ();
                 SendBuffer (outBuffer, outLen, dExchange, replyToKey);
             }
-
-            clientAdded ();
-            // TODO: Send client-added to each remote agent.
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=656920&r1=656919&r2=656920&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Thu 
May 15 18:53:17 2008
@@ -155,6 +155,7 @@
     uint32_t                     localBank;
     uint32_t                     nextObjectId;
     uint32_t                     nextRemoteBank;
+    bool                         clientWasAdded;
 
 #   define MA_BUFFER_SIZE 65536
     char inputBuffer[MA_BUFFER_SIZE];

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp?rev=656920&r1=656919&r2=656920&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp 
Thu May 15 18:53:17 2008
@@ -53,6 +53,14 @@
     TopicExchange::route (msg, routingKey, args);
 }
 
+bool ManagementExchange::bind (Queue::shared_ptr queue,
+                               const string& routingKey,
+                               const qpid::framing::FieldTable* args)
+{
+    managementAgent->clientAdded ();
+    return TopicExchange::bind (queue, routingKey, args);
+}
+
 void ManagementExchange::setManagmentAgent (ManagementBroker* agent)
 {
     managementAgent = agent;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h?rev=656920&r1=656919&r2=656920&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h Thu 
May 15 18:53:17 2008
@@ -46,6 +46,10 @@
                         const string& routingKey,
                         const qpid::framing::FieldTable* args);
 
+    virtual bool bind (Queue::shared_ptr queue,
+                       const string& routingKey,
+                       const qpid::framing::FieldTable* args);
+
     void setManagmentAgent (management::ManagementBroker* agent);
 
     virtual ~ManagementExchange();


Reply via email to