When an object is deleted, and the object has children, the delete meesage is
sent for each deleted object to PBE.  Since there are a lot of messages in the
cascade delete from IMMND to PBE at once, there is a limitation that the cascade
delete should not be done on object that contains more than 10000 object.  More
than 10000 object may cause buffer overload (e.g. TIPC_ERR_OVERLOAD), and
messages might be lost.

The improvement should send only one message to PBE which will contain only the
root object. The rest of cascade delete will be on PBE side.
---
 src/imm/common/immpbe_dump.cc | 548 ++++++++++++++++++++++++++++++------------
 src/imm/immnd/ImmModel.cc     |  18 +-
 src/imm/immnd/ImmModel.h      |   3 +-
 src/imm/immnd/immnd_evt.c     | 210 ++++++++--------
 4 files changed, 508 insertions(+), 271 deletions(-)

diff --git a/src/imm/common/immpbe_dump.cc b/src/imm/common/immpbe_dump.cc
index 11af674..17d8eb9 100644
--- a/src/imm/common/immpbe_dump.cc
+++ b/src/imm/common/immpbe_dump.cc
@@ -33,6 +33,8 @@
 #include <stdint.h>
 #include <sys/stat.h>
 #include <libgen.h>
+#include <set>
+#include <vector>
 
 #include <saAis.h>
 #include "base/osaf_extended_name.h"
@@ -146,7 +148,24 @@ static const char *preparedSql[] = {
 
 static sqlite3_stmt *preparedStmt[SQL_STMT_SIZE] = {NULL};
 
-static int prepareSqlStatements(sqlite3 *dbHandle) {
+typedef struct {
+  unsigned obj_id;
+  unsigned class_id;
+  char *dn;
+} ObjectInfo;
+
+typedef std::set<ObjectInfo *> ObjectSet;
+typedef std::map<std::string, ObjectInfo *> ReverseDnMap;
+typedef std::map<unsigned int, std::string> ClassNameMap;
+
+// Used for collecting objects and classes
+typedef std::map<unsigned, ObjectSet> ClassInstanceMap;
+
+static ObjectSet sObjectSet;
+static ReverseDnMap sReverseDnMap;
+static ClassNameMap sClassNameMap;
+
+static bool prepareSqlStatements(sqlite3 *dbHandle) {
   int i;
   int rc;
 
@@ -155,11 +174,11 @@ static int prepareSqlStatements(sqlite3 *dbHandle) {
                             NULL);
     if (rc != SQLITE_OK) {
       LOG_ER("Failed to prepare SQL statement for: %s", preparedSql[i]);
-      return -1;
+      return false;
     }
   }
 
-  return 0;
+  return true;
 }
 
 int finalizeSqlStatement(void *stmt) {
@@ -546,6 +565,327 @@ void pbeAtomicSwitchFile(const char *filePath, 
std::string localTmpFilename) {
   }
 }
 
+static std::string reverseDn(std::string& input) {
+/* reverseDn() has been copied from ReverseDn() in imm_xmlw_dump.cc */
+  std::string result = "";
+  size_t start_cut = 0;
+  size_t comma_pos = 0;
+
+  do {
+    size_t start_search = start_cut;
+    while ((comma_pos = input.find(",", start_search)) ==
+           input.find("\\,", start_search) + 1)
+      start_search = input.find(",", start_search) +
+                     1; /* Skip the "\," by shifting start position*/
+
+    /* Insert RDN to the begin of the result */
+    if (!result.empty()) result.insert(0, ",");
+    result.insert(0, input, start_cut, comma_pos - start_cut);
+
+    /* Next RDN */
+    start_cut = comma_pos + 1;
+  } while (comma_pos != std::string::npos);
+
+  return result;
+}
+
+static void reverseAndInsertDn(std::string &dn,
+                               unsigned obj_id,
+                               unsigned class_id) {
+  std::string revdn;
+  ObjectInfo *info;
+
+  revdn = reverseDn(dn);
+
+  info = (ObjectInfo *)malloc(sizeof(ObjectInfo));
+  info->obj_id = obj_id;
+  info->class_id = class_id;
+  info->dn = strdup(dn.c_str());
+
+  sObjectSet.insert(info);
+  sReverseDnMap[revdn] = info;
+}
+
+static ObjectInfo *findObjectInfo(std::string &dn) {
+  std::string revDn = reverseDn(dn);
+  auto obj = sReverseDnMap.find(revDn);
+  return (obj != sReverseDnMap.end()) ? obj->second : NULL;
+}
+
+static bool prepareLocalData(sqlite3 *dbHandle) {
+  const char *classSql = "SELECT class_id, class_name FROM classes";
+  const char *objSql = "SELECT dn, obj_id, class_id FROM objects";
+  sqlite3_stmt *stmt = NULL;
+  int rc;
+  bool ret = false;
+  unsigned obj_id;
+  unsigned class_id;
+  char *class_name;
+  std::string dn;
+  int count = 0;
+
+  TRACE_ENTER();
+
+  rc = sqlite3_prepare_v2(dbHandle, classSql, -1, &stmt, NULL);
+  if (rc != SQLITE_OK) {
+    LOG_ER("Failed to prepare SQL statement for: %s", classSql);
+    goto failed;
+  }
+
+  while((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+    class_id = (unsigned int)sqlite3_column_int(stmt, 0);
+    class_name = (char *)sqlite3_column_text(stmt, 1);
+
+    sClassNameMap[class_id] = class_name;
+
+    ++count;
+  }
+  if (rc != SQLITE_DONE) {
+    LOG_ER("SQL statement ('%s') failed. Error code: %d", classSql, rc);
+    goto failed;
+  }
+  sqlite3_finalize(stmt);
+  stmt = NULL;
+
+  TRACE("Added %d classes local class map", count);
+
+  rc = sqlite3_prepare_v2(dbHandle, objSql, -1, &stmt, NULL);
+  if (rc != SQLITE_OK) {
+    LOG_ER("Failed to prepare SQL statement for: %s", objSql);
+    goto failed;
+  }
+
+  count = 0;
+  while((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+    dn = (char *)sqlite3_column_text(stmt, 0);
+    obj_id = sqlite3_column_int(stmt, 1);
+    class_id = sqlite3_column_int(stmt, 2);
+
+    reverseAndInsertDn(dn, obj_id, class_id);
+
+    ++count;
+  }
+  if (rc != SQLITE_DONE) {
+    LOG_ER("SQL statement ('%s') failed. Error code: %d", objSql, rc);
+    goto failed;
+  }
+
+  TRACE("Added %d reverse DNs to reverse_dn table", count);
+
+  ret = true;
+
+failed:
+  if(stmt) {
+    sqlite3_finalize(stmt);
+  }
+
+  TRACE_LEAVE();
+
+  return ret;
+}
+
+static void collectObjectInfo(std::string &root,
+                              ObjectSet &objects,
+                              ClassInstanceMap &class_instances) {
+  /* The function returns the root object and all its children in 'objects'.
+   * 'class_instances' contains all classes with classes instances in
+   * returned objects.
+   */
+  std::string revDn = reverseDn(root);
+  std::string childDnStart = revDn + ",";
+  auto it = sReverseDnMap.find(revDn);
+
+  if (it == sReverseDnMap.end()) {
+    return;
+  }
+  // Add root object
+  objects.insert(it->second);
+  class_instances[it->second->class_id].insert(it->second);
+
+  // Skip possible objects between "dn" and "dn,"
+  ++it;
+  while(it != sReverseDnMap.end()
+      && !strncmp(it->first.c_str(), revDn.c_str(), revDn.size())
+      && strncmp(it->first.c_str(), childDnStart.c_str(), 
childDnStart.size())) {
+    ++it;
+  }
+
+  // Add all children objects
+  for(; it != sReverseDnMap.end()
+        && !strncmp(it->first.c_str(), childDnStart.c_str(), 
childDnStart.size());
+        ++it) {
+    objects.insert(it->second);
+    class_instances[it->second->class_id].insert(it->second);
+  }
+}
+
+static bool deleteObjectList(sqlite3 *dbHandle, ObjectSet &objectSet, bool 
*badfile) {
+  sqlite3_stmt *stmt = NULL;
+  int rc = 0;
+  bool ret = false;
+  unsigned object_id;
+
+  TRACE_ENTER();
+
+  if (badfile) {
+    *badfile = false;
+  }
+
+  for (auto &obj : objectSet) {
+    object_id = obj->obj_id;
+
+    /*
+      First, delete the root object tuple in objects for obj_id.
+     */
+    stmt = preparedStmt[SQL_DEL_OBJECTS];
+    if ((rc = sqlite3_bind_int(stmt, 1, object_id)) != SQLITE_OK) {
+      LOG_ER("Failed to bind obj_id with error code: %d", rc);
+      goto failed;
+    }
+    rc = sqlite3_step(stmt);
+    if (rc != SQLITE_DONE) {
+      LOG_ER("SQL statement ('%s') failed because:\n %s",
+          preparedSql[SQL_DEL_OBJECTS], sqlite3_errmsg(dbHandle));
+      if (badfile) {
+        *badfile = true;
+      }
+      goto failed;
+    }
+    sqlite3_reset(stmt);
+    stmt = NULL;
+    TRACE("Deleted %u values", sqlite3_changes(dbHandle));
+
+    /*
+      Second, delete from objects_int_multi, objects_real_multi,
+      objects_text_multi
+      where obj_id == OBJ_ID
+     */
+    stmt = preparedStmt[SQL_DEL_OBJ_INT_MULTI_ID];
+    if ((rc = sqlite3_bind_int(stmt, 1, object_id)) != SQLITE_OK) {
+      LOG_ER("Failed to bind obj_id with error code: %d", rc);
+      goto failed;
+    }
+    rc = sqlite3_step(stmt);
+    if (rc != SQLITE_DONE) {
+      LOG_ER("SQL statement ('%s') failed because:\n %s",
+             preparedSql[SQL_DEL_OBJ_INT_MULTI_ID], sqlite3_errmsg(dbHandle));
+      goto failed;
+    }
+    TRACE("Deleted %u values", sqlite3_changes(dbHandle));
+    sqlite3_reset(stmt);
+    stmt = NULL;
+
+    stmt = preparedStmt[SQL_DEL_OBJ_REAL_MULTI_ID];
+    if ((rc = sqlite3_bind_int(stmt, 1, object_id)) != SQLITE_OK) {
+      LOG_ER("Failed to bind obj_id with error code: %d", rc);
+      goto failed;
+    }
+    rc = sqlite3_step(stmt);
+    if (rc != SQLITE_DONE) {
+      LOG_ER("SQL statement ('%s') failed because:\n %s",
+             preparedSql[SQL_DEL_OBJ_REAL_MULTI_ID], sqlite3_errmsg(dbHandle));
+      goto failed;
+    }
+    TRACE("Deleted %u values", sqlite3_changes(dbHandle));
+    sqlite3_reset(stmt);
+    stmt = NULL;
+
+    stmt = preparedStmt[SQL_DEL_OBJ_TEXT_MULTI_ID];
+    if ((rc = sqlite3_bind_int(stmt, 1, object_id)) != SQLITE_OK) {
+      LOG_ER("Failed to bind obj_id with error code: %d", rc);
+      goto failed;
+    }
+    rc = sqlite3_step(stmt);
+    if (rc != SQLITE_DONE) {
+      LOG_ER("SQL statement ('%s') failed because:\n %s",
+             preparedSql[SQL_DEL_OBJ_TEXT_MULTI_ID], sqlite3_errmsg(dbHandle));
+      goto failed;
+    }
+    TRACE("Deleted %u values", sqlite3_changes(dbHandle));
+    sqlite3_reset(stmt);
+    stmt = NULL;
+  }
+
+  ret = true;
+
+failed:
+  if (stmt) {
+    sqlite3_reset(stmt);
+  }
+  TRACE_LEAVE();
+
+  return ret;
+}
+
+static bool deleteClassInstances(sqlite3 *dbHandle,
+                                 ClassInstanceMap &classInstances) {
+  sqlite3_stmt *stmt = NULL;
+  int rc = 0;
+  std::string className;
+  std::string sql;
+
+  TRACE_ENTER();
+
+  for (auto &cii : classInstances) {
+    auto cmi = sClassNameMap.find(cii.first);
+    assert(cmi != sClassNameMap.end());
+
+    sql = "DELETE FROM '";
+    sql.append(cmi->second);
+    sql.append("' WHERE obj_id = ?");
+
+    rc = sqlite3_prepare_v2(dbHandle, sql.c_str(), -1, &stmt, NULL);
+    if (rc != SQLITE_OK) {
+      LOG_ER("Failed to prepare SQL statement for: %s", sql.c_str());
+      goto failed;
+    }
+
+    for (auto &it : cii.second) {
+      if ((rc = sqlite3_bind_int(stmt, 1, it->obj_id)) != SQLITE_OK) {
+        LOG_ER("Failed to bind obj_id with error code: %d", rc);
+        goto failed;
+      }
+      rc = sqlite3_step(stmt);
+      if (rc != SQLITE_DONE) {
+        LOG_ER("SQL statement ('%s') failed because:\n %s",
+               sql.c_str(), sqlite3_errmsg(dbHandle));
+        goto failed;
+      }
+      sqlite3_reset(stmt);
+    }
+
+    sqlite3_finalize(stmt);
+    stmt = NULL;
+  }
+
+  return true;
+
+failed:
+  if (stmt) {
+    sqlite3_finalize(stmt);
+  }
+  return false;
+}
+
+static void removeObject(std::string &dn) {
+  std::string revDn;
+
+  revDn = reverseDn(dn);
+  sReverseDnMap.erase(revDn);
+}
+
+static void removeObjects(ObjectSet &objects) {
+  std::string dn;
+
+  for (auto obj : objects) {
+    sObjectSet.erase(obj);
+    dn = obj->dn;
+    removeObject(dn);
+    free(obj->dn);
+    free(obj);
+  }
+}
+
 void *pbeRepositoryInit(const char *filePath, bool create,
                         std::string &localTmpFilename) {
   int fd = (-1);
@@ -719,7 +1059,9 @@ void *pbeRepositoryInit(const char *filePath, bool create,
     TRACE("Successfully executed %s", sql_tr[ix]);
   }
 
-  prepareSqlStatements(dbHandle);
+  if (!prepareSqlStatements(dbHandle)) {
+    goto bailout;
+  }
 
   *sPbeFileName = std::string(filePath);
   if (localTmpDir) free(localTmpDir);
@@ -822,7 +1164,13 @@ re_attach:
   *sPbeFileName =
       std::string(filePath); /* Avoid apend to presumed empty string */
 
-  prepareSqlStatements(dbHandle);
+  if (!prepareSqlStatements(dbHandle)) {
+    goto bailout;
+  }
+
+  if (!prepareLocalData(dbHandle)) {
+    goto bailout;
+  }
 
   TRACE_LEAVE();
   return dbHandle;
@@ -1087,6 +1435,8 @@ ClassInfo *classToPBE(std::string classNameString, 
SaImmHandleT immHandle,
     goto bailout;
   }
 
+  sClassNameMap[class_id] = classNameString;
+
   TRACE_LEAVE();
   return classInfo;
 
@@ -1200,6 +1550,8 @@ void deleteClassToPBE(std::string classNameString, void 
*db_handle,
     TRACE("Dropped table %s rows:%u", classNameString.c_str(), rowsModified);
   }
 
+  sClassNameMap.erase(theClass->mClassId);
+
   TRACE_LEAVE();
   return;
 
@@ -2046,6 +2398,10 @@ unsigned int purgeInstancesOfClassToPBE(SaImmHandleT 
immHandle,
   sqlite3 *dbHandle = (sqlite3 *)db_handle;
   const char *classNamePar = className.c_str();
   unsigned int nrofDeletes = 0;
+  ObjectSet objects;
+  ObjectInfo *object;
+  ClassInstanceMap classInstances;
+  std::string dn;
   TRACE_ENTER();
 
   searchParam.searchOneAttr.attrName = (SaImmAttrNameT)SA_IMM_ATTR_CLASS_NAME;
@@ -2084,11 +2440,34 @@ unsigned int purgeInstancesOfClassToPBE(SaImmHandleT 
immHandle,
 
     // assert(attrs[0] == NULL);
 
-    objectDeleteToPBE(std::string(osaf_extended_name_borrow(&objectName)),
-                      db_handle);
+//    objectDeleteToPBE(std::string(osaf_extended_name_borrow(&objectName)),
+//                      db_handle);
+
+    dn = std::string(osaf_extended_name_borrow(&objectName));
+    object = findObjectInfo(dn);
+    if (object) {
+      objects.insert(object);
+      classInstances[object->class_id].insert(object);
+    } else {
+      LOG_WA("%s cannot be found in PBE cache", dn.c_str());
+      goto bailout;
+    }
+
     ++nrofDeletes;
   } while (true);
 
+  if (!deleteObjectList(dbHandle, objects, NULL)) {
+    LOG_ER("Failed to delete object.");
+    goto bailout;
+  }
+
+  if (!deleteClassInstances(dbHandle, classInstances)) {
+    LOG_ER("Failed to delete instance of a class");
+    goto bailout;
+  }
+
+  removeObjects(objects);
+
   if (SA_AIS_ERR_NOT_EXIST != errorCode) {
     LOG_ER("Failed in saImmOmSearchNext_2:%u - exiting", errorCode);
     goto bailout;
@@ -2180,163 +2559,32 @@ bailout:
 
 void objectDeleteToPBE(std::string objectNameString, void *db_handle) {
   sqlite3 *dbHandle = (sqlite3 *)db_handle;
-  sqlite3_stmt *stmt;
-  std::string sql("delete from \"");
-
-  int rc = 0;
-  char *zErr = NULL;
-  std::string object_id_str;
-  int object_id;
-  int class_id;
-  std::string class_name;
+  ObjectSet objects;
+  ClassInstanceMap classInstances;
   bool badfile = false;
   TRACE_ENTER();
   assert(dbHandle);
 
-  /* First, look up obj_id and class_id  from objects where dn == objname. */
-  stmt = preparedStmt[SQL_SEL_OBJECTS_DN];
-  if ((rc = sqlite3_bind_text(stmt, 1, objectNameString.c_str(), -1, NULL)) !=
-      SQLITE_OK) {
-    LOG_ER("Failed to bind dn with error code: %d", rc);
-    goto bailout;
-  }
-  rc = sqlite3_step(stmt);
-  if (rc == SQLITE_DONE) {
-    LOG_ER("Expected 1 row got 0 rows (line: %u)", __LINE__);
-    badfile = true;
-    goto bailout;
-  }
-  if (rc != SQLITE_ROW) {
-    LOG_ER("Could not access object '%s' for delete, error:%s",
-           objectNameString.c_str(), sqlite3_errmsg(dbHandle));
-    badfile = true;
-    goto bailout;
-  }
-
-  object_id = sqlite3_column_int(stmt, 0);
-  object_id_str.append((char *)sqlite3_column_text(stmt, 0));
-  class_id = sqlite3_column_int(stmt, 1);
 
-  if (sqlite3_step(stmt) == SQLITE_ROW) {
-    LOG_ER("Expected 1 row got more then 1 row (line: %u)", __LINE__);
-    badfile = true;
+  collectObjectInfo(objectNameString, objects, classInstances);
+  if (!objects.size()) {
+    LOG_ER("Object %s does not exist. Possible corrupted data",
+        objectNameString.c_str());
     goto bailout;
   }
-  sqlite3_reset(stmt);
 
-  TRACE_2("Successfully accessed object '%s'.", objectNameString.c_str());
-  TRACE_2("object_id:%d class_id:%d", object_id, class_id);
-
-  /*
-    Second, delete the root object tuple in objects for obj_id.
-  */
-  stmt = preparedStmt[SQL_DEL_OBJECTS];
-  if ((rc = sqlite3_bind_int(stmt, 1, object_id)) != SQLITE_OK) {
-    LOG_ER("Failed to bind obj_id with error code: %d", rc);
-    goto bailout;
-  }
-  rc = sqlite3_step(stmt);
-  if (rc != SQLITE_DONE) {
-    LOG_ER("SQL statement ('%s') failed because:\n %s",
-           preparedSql[SQL_DEL_OBJECTS], sqlite3_errmsg(dbHandle));
+  if (!deleteObjectList(dbHandle, objects, &badfile)) {
+    LOG_ER("Failed to delete object.");
     goto bailout;
   }
-  sqlite3_reset(stmt);
-  TRACE("Deleted %u values", sqlite3_changes(dbHandle));
 
-  /* Third get the class-name for the object */
-  stmt = preparedStmt[SQL_SEL_CLASSES_ID];
-  if ((rc = sqlite3_bind_int(stmt, 1, class_id)) != SQLITE_OK) {
-    LOG_ER("Failed to bind class_id with error code: %d", rc);
+  if (!deleteClassInstances(dbHandle, classInstances)) {
+    LOG_ER("Failed to delete instance of a class");
     goto bailout;
   }
-  rc = sqlite3_step(stmt);
-  if (rc == SQLITE_DONE) {
-    LOG_ER("Expected 1 row got 0 rows (line: %u)", __LINE__);
-    badfile = true;
-    goto bailout;
-  }
-  if (rc != SQLITE_ROW) {
-    LOG_ER("SQL statement ('%s') failed because:\n %s",
-           preparedSql[SQL_SEL_CLASSES_ID], sqlite3_errmsg(dbHandle));
-    goto bailout;
-  }
-
-  class_name.append((char *)sqlite3_column_text(stmt, 0));
-
-  if (sqlite3_step(stmt) == SQLITE_ROW) {
-    LOG_ER("Expected 1 row got more then 1 row (line: %u)", __LINE__);
-    badfile = true;
-    goto bailout;
-  }
-  sqlite3_reset(stmt);
 
-  TRACE_2("Successfully accessed classes class_id:%d class_name:'%s'", 
class_id,
-          class_name.c_str());
+  removeObjects(objects);
 
-  /* Fourth delete the base attribute tuple from table 'classname' for obj_id.
-   */
-  sql.append(class_name);
-  sql.append("\" where obj_id = ");
-  sql.append(object_id_str);
-
-  TRACE("GENERATED 4:%s", sql.c_str());
-  rc = sqlite3_exec(dbHandle, sql.c_str(), NULL, NULL, &zErr);
-  if (rc) {
-    LOG_ER("SQL statement ('%s') failed because:\n %s", sql.c_str(), zErr);
-    sqlite3_free(zErr);
-    goto bailout;
-  }
-
-  TRACE("Deleted %u values", sqlite3_changes(dbHandle));
-
-  /*
-    Fifth delete from objects_int_multi, objects_real_multi, objects_text_multi
-    where obj_id ==OBJ_ID
-   */
-  stmt = preparedStmt[SQL_DEL_OBJ_INT_MULTI_ID];
-  if ((rc = sqlite3_bind_int(stmt, 1, object_id)) != SQLITE_OK) {
-    LOG_ER("Failed to bind obj_id with error code: %d", rc);
-    goto bailout;
-  }
-  rc = sqlite3_step(stmt);
-  if (rc != SQLITE_DONE) {
-    LOG_ER("SQL statement ('%s') failed because:\n %s",
-           preparedSql[SQL_DEL_OBJ_INT_MULTI_ID], sqlite3_errmsg(dbHandle));
-    goto bailout;
-  }
-  TRACE("Deleted %u values", sqlite3_changes(dbHandle));
-  sqlite3_reset(stmt);
-
-  stmt = preparedStmt[SQL_DEL_OBJ_REAL_MULTI_ID];
-  if ((rc = sqlite3_bind_int(stmt, 1, object_id)) != SQLITE_OK) {
-    LOG_ER("Failed to bind obj_id with error code: %d", rc);
-    goto bailout;
-  }
-  rc = sqlite3_step(stmt);
-  if (rc != SQLITE_DONE) {
-    LOG_ER("SQL statement ('%s') failed because:\n %s",
-           preparedSql[SQL_DEL_OBJ_REAL_MULTI_ID], sqlite3_errmsg(dbHandle));
-    goto bailout;
-  }
-  TRACE("Deleted %u values", sqlite3_changes(dbHandle));
-  sqlite3_reset(stmt);
-
-  stmt = preparedStmt[SQL_DEL_OBJ_TEXT_MULTI_ID];
-  if ((rc = sqlite3_bind_int(stmt, 1, object_id)) != SQLITE_OK) {
-    LOG_ER("Failed to bind obj_id with error code: %d", rc);
-    goto bailout;
-  }
-  rc = sqlite3_step(stmt);
-  if (rc != SQLITE_DONE) {
-    LOG_ER("SQL statement ('%s') failed because:\n %s",
-           preparedSql[SQL_DEL_OBJ_TEXT_MULTI_ID], sqlite3_errmsg(dbHandle));
-    goto bailout;
-  }
-  TRACE("Deleted %u values", sqlite3_changes(dbHandle));
-  sqlite3_reset(stmt);
-
-  TRACE_LEAVE();
   return;
 
 bailout:
@@ -2486,6 +2734,8 @@ bool objectToPBE(std::string objectNameString, const 
SaImmAttrValuesT_2 **attrs,
   sqlite3_reset(stmt);
   sqlite3_clear_bindings(stmt);
 
+  reverseAndInsertDn(objectNameString, object_id, class_id);
+
   TRACE_LEAVE();
   return true;
 bailout:
diff --git a/src/imm/immnd/ImmModel.cc b/src/imm/immnd/ImmModel.cc
index c539fda..f7c8fc0 100644
--- a/src/imm/immnd/ImmModel.cc
+++ b/src/imm/immnd/ImmModel.cc
@@ -2532,6 +2532,7 @@ void ImmModel::pbePrtoPurgeMutations(unsigned int nodeId,
   ContinuationMap2::iterator ci;
   ImmAttrValueMap::iterator oavi;
   ObjectInfo* afim = NULL;
+  std::set<SaUint32T> connSet;
   TRACE_ENTER();
   bool dummy = false;
   bool dummy2 = false;
@@ -2560,7 +2561,11 @@ void ImmModel::pbePrtoPurgeMutations(unsigned int nodeId,
            be the only proper reply, so we let the client
            timeout by not replying.
         */
-        connVector.push_back(ci->second.mConn);
+        if(connSet.find(ci->second.mConn) == connSet.end()) {
+          /* Don't add a connection more that once */
+          connSet.insert(ci->second.mConn);
+          connVector.push_back(ci->second.mConn);
+        }
       }
       sPbeRtReqContinuationMap.erase(ci);
     }
@@ -10440,7 +10445,8 @@ SaAisErrorT ImmModel::ccbObjectDelete(
 
     err = deleteObject(oi, reqConn, adminOwner, ccb, doIt, objNameVector,
                        connVector, continuations,
-                       pbeConnPtr ? (*pbeConnPtr) : 0, &readLockedObject);
+                       pbeConnPtr ? (*pbeConnPtr) : 0, &readLockedObject,
+                       true);
 
     if (err == SA_AIS_OK && readLockedObject != NULL) {
       safeReadObjSet.insert(readLockedObject);
@@ -10462,7 +10468,8 @@ SaAisErrorT ImmModel::ccbObjectDelete(
           --childCount;
           err = deleteObject(oi2, reqConn, adminOwner, ccb, doIt, 
objNameVector,
                              connVector, continuations,
-                             pbeConnPtr ? (*pbeConnPtr) : 0, 
&readLockedObject);
+                             pbeConnPtr ? (*pbeConnPtr) : 0, &readLockedObject,
+                             false);
           if (err == SA_AIS_OK && readLockedObject != NULL) {
             safeReadObjSet.insert(readLockedObject);
           }
@@ -10493,7 +10500,8 @@ SaAisErrorT ImmModel::deleteObject(ObjectMap::iterator& 
oi, SaUint32T reqConn,
                                    ConnVector& connVector,
                                    IdVector& continuations,
                                    unsigned int pbeIsLocal,
-                                   ObjectInfo** readLockedObject) {
+                                   ObjectInfo** readLockedObject,
+                                   bool sendToPbe) {
   /*TRACE_ENTER();*/
   bool configObj = true;
   std::string objAdminOwnerName;
@@ -10794,7 +10802,7 @@ SaAisErrorT ImmModel::deleteObject(ObjectMap::iterator& 
oi, SaUint32T reqConn,
 
     if (nonPersistentRto) {
       TRACE_7("Not incrementing op-count for ccb delete of non-persistent 
RTO");
-    } else {
+    } else if (sendToPbe) {
       ccb->mOpCount++;
     }
 
diff --git a/src/imm/immnd/ImmModel.h b/src/imm/immnd/ImmModel.h
index fae308c..c8b0f4e 100644
--- a/src/imm/immnd/ImmModel.h
+++ b/src/imm/immnd/ImmModel.h
@@ -219,7 +219,8 @@ class ImmModel {
                            ObjectNameVector& objNameVector,
                            ConnVector& connVector, IdVector& continuations,
                            unsigned int pbeIsLocal,
-                           ObjectInfo** readLockedObject);
+                           ObjectInfo** readLockedObject,
+                           bool sendToPbe);
 
   void setCcbErrorString(CcbInfo* ccb, const char* errorString, va_list vl);
 
diff --git a/src/imm/immnd/immnd_evt.c b/src/imm/immnd/immnd_evt.c
index 228b7dd..128056d 100644
--- a/src/imm/immnd/immnd_evt.c
+++ b/src/imm/immnd/immnd_evt.c
@@ -8298,15 +8298,6 @@ static void immnd_evt_proc_object_delete(IMMND_CB *cb, 
IMMND_EVT *evt,
                osafassert(pbeNodeId);
                osafassert(pbeNodeId == cb->node_id);
                implHandle = m_IMMSV_PACK_HANDLE(pbeConn, pbeNodeId);
-               memset(&send_evt, '\0', sizeof(IMMSV_EVT));
-               send_evt.type = IMMSV_EVT_TYPE_IMMA;
-               /* PBE is internal => can handle long DNs */
-               send_evt.info.imma.type = IMMA_EVT_ND2A_OI_OBJ_DELETE_UC;
-               send_evt.info.imma.info.objDelete.ccbId =
-                   evt->info.objDelete.ccbId;
-               send_evt.info.imma.info.objDelete.immHandle = implHandle;
-               send_evt.info.imma.info.objDelete.adminOwnerId =
-                   0; /* No reply!*/
 
                /*Fetch client node for PBE */
                immnd_client_node_get(cb, implHandle, &oi_cl_node);
@@ -8322,7 +8313,11 @@ static void immnd_evt_proc_object_delete(IMMND_CB *cb, 
IMMND_EVT *evt,
                        err = SA_AIS_ERR_FAILED_OPERATION;
                        immnd_proc_global_abort_ccb(cb,
                                                    evt->info.objDelete.ccbId);
-               } else {
+               } else if (arrSize > 0) {
+                       /* If arrSize == 0, it means that the object has been 
already
+                       * deleted, and in that case we don't need to send 
anything
+                       * to PBE
+                       */
                        /* We have obtained PBE handle & dest info for PBE.
                           Iterate through objNameArray and send delete upcalls
                           to PBE. PBE delete upcalls are generated for all
@@ -8331,28 +8326,29 @@ static void immnd_evt_proc_object_delete(IMMND_CB *cb, 
IMMND_EVT *evt,
                           upcalls are generated for cached non-persistent
                           runtime objects that are delete as a side effect.
                         */
-                       int ix = 0;
-                       for (; ix < arrSize && err == SA_AIS_OK; ++ix) {
-                               send_evt.info.imma.info.objDelete.objectName
-                                   .size =
-                                   (SaUint32T)strlen(objNameArr[ix]) + 1;
-                               send_evt.info.imma.info.objDelete.objectName
-                                   .buf = objNameArr[ix];
+                       /* PBE will handle children objects */
+                       memset(&send_evt, '\0', sizeof(IMMSV_EVT));
+                       send_evt.type = IMMSV_EVT_TYPE_IMMA;
+                       /* PBE is internal => can handle long DNs */
+                       send_evt.info.imma.type = 
IMMA_EVT_ND2A_OI_OBJ_DELETE_UC;
+                       send_evt.info.imma.info.objDelete.ccbId =
+                                       evt->info.objDelete.ccbId;
+                       send_evt.info.imma.info.objDelete.immHandle = 
implHandle;
+                       send_evt.info.imma.info.objDelete.adminOwnerId = 0; /* 
No reply!*/
+                       send_evt.info.imma.info.objDelete.objectName.size =
+                                       evt->info.objDelete.objectName.size;
+                       send_evt.info.imma.info.objDelete.objectName.buf =
+                                       evt->info.objDelete.objectName.buf;
 
-                               TRACE_2(
-                                   "MAKING PBE-IMPLEMENTER OBJ DELETE upcall");
-                               if (immnd_mds_msg_send(
-                                       cb, NCSMDS_SVC_ID_IMMA_OI,
-                                       oi_cl_node->agent_mds_dest,
-                                       &send_evt) != NCSCC_RC_SUCCESS) {
-                                       LOG_ER(
-                                           "Immnd upcall over MDS for 
ccbObjectDelete "
-                                           "to PBE failed! - aborting ccb %u",
-                                           evt->info.objDelete.ccbId);
-                                       err = SA_AIS_ERR_FAILED_OPERATION;
-                                       immnd_proc_global_abort_ccb(
-                                           cb, evt->info.objDelete.ccbId);
-                               }
+                       TRACE_2("MAKING PBE-IMPLEMENTER OBJ DELETE upcall");
+                       if (immnd_mds_msg_send(cb, NCSMDS_SVC_ID_IMMA_OI,
+                                                               
oi_cl_node->agent_mds_dest,
+                                                               &send_evt) != 
NCSCC_RC_SUCCESS) {
+                               LOG_ER("Immnd upcall over MDS for 
ccbObjectDelete "
+                                               "to PBE failed! - aborting ccb 
%u",
+                                               evt->info.objDelete.ccbId);
+                               err = SA_AIS_ERR_FAILED_OPERATION;
+                               immnd_proc_global_abort_ccb(cb, 
evt->info.objDelete.ccbId);
                        }
                }
        } /* End of PersistentBackEnd handling. */
@@ -8717,16 +8713,6 @@ static void immnd_evt_proc_rt_object_delete(IMMND_CB 
*cb, IMMND_EVT *evt,
                        osafassert(cb->mIsCoord);
                        osafassert(pbeNodeId == cb->node_id);
                        implHandle = m_IMMSV_PACK_HANDLE(pbeConn, pbeNodeId);
-                       memset(&send_evt, '\0', sizeof(IMMSV_EVT));
-                       send_evt.type = IMMSV_EVT_TYPE_IMMA;
-                       /* PBE is internal => can handle long DNs */
-                       send_evt.info.imma.type =
-                           IMMA_EVT_ND2A_OI_OBJ_DELETE_UC;
-                       send_evt.info.imma.info.objDelete.ccbId = 0;
-                       send_evt.info.imma.info.objDelete.adminOwnerId =
-                           continuationId;
-                       send_evt.info.imma.info.objDelete.immHandle =
-                           implHandle;
 
                        /*Fetch client node for PBE */
                        immnd_client_node_get(cb, implHandle, &pbe_cl_node);
@@ -8743,52 +8729,56 @@ static void immnd_evt_proc_rt_object_delete(IMMND_CB 
*cb, IMMND_EVT *evt,
                                goto done;
                        } else {
                                /* We have obtained PBE handle & dest info for
-                                  PBE. Iterate through objNameArray and send
-                                  delete upcalls to PBE.
+                                  PBE. Send delete upcalls to PBE.
                                */
-                               int ix = 0;
-                               for (; ix < arrSize && err == SA_AIS_OK; ++ix) {
-                                       send_evt.info.imma.info.objDelete
-                                           .objectName.size =
-                                           (SaUint32T)strlen(objNameArr[ix]) +
-                                           1;
-                                       send_evt.info.imma.info.objDelete
-                                           .objectName.buf = objNameArr[ix];
-
-                                       TRACE_2(
-                                           "MAKING PBE-IMPLEMENTER PERSISTENT 
RT-OBJ DELETE upcalls");
-                                       if (immnd_mds_msg_send(
-                                               cb, NCSMDS_SVC_ID_IMMA_OI,
-                                               pbe_cl_node->agent_mds_dest,
-                                               &send_evt) !=
-                                           NCSCC_RC_SUCCESS) {
-                                               LOG_WA(
-                                                   "Upcall over MDS for 
persistent rt obj delete "
-                                                   "to PBE failed!");
-                                               /* TODO: we could possibly
-                                                  revert the delete here an
-                                                  return TRY_AGAIN. We may have
-                                                  succeeded in sending some
-                                                  deletes, but since we did not
-                                                  send the completed, the PRTO
-                                                  deletes will not be commited
-                                                  by the PBE.
-                                                */
-                                               goto done;
-                                       }
+                               memset(&send_evt, '\0', sizeof(IMMSV_EVT));
+                               send_evt.type = IMMSV_EVT_TYPE_IMMA;
+                               /* PBE is internal => can handle long DNs */
+                               send_evt.info.imma.type = 
IMMA_EVT_ND2A_OI_OBJ_DELETE_UC;
+                               send_evt.info.imma.info.objDelete.ccbId = 0;
+                               send_evt.info.imma.info.objDelete.adminOwnerId =
+                                               continuationId;
+                               send_evt.info.imma.info.objDelete.immHandle = 
implHandle;
+
+                               
send_evt.info.imma.info.objDelete.objectName.size =
+                                               
evt->info.objDelete.objectName.size;
+                               
send_evt.info.imma.info.objDelete.objectName.buf =
+                                               
evt->info.objDelete.objectName.buf;
+
+                               if (immnd_mds_msg_send(cb, 
NCSMDS_SVC_ID_IMMA_OI,
+                                                                       
pbe_cl_node->agent_mds_dest,
+                                                                       
&send_evt) !=
+                                                                               
        NCSCC_RC_SUCCESS) {
+                                       LOG_WA("Upcall over MDS for persistent 
rt obj delete "
+                                                       "to PBE failed!");
+                                       /* TODO: we could possibly
+                                          revert the delete here an
+                                          return TRY_AGAIN. We may have
+                                          succeeded in sending some
+                                          deletes, but since we did not
+                                          send the completed, the PRTO
+                                          deletes will not be commited
+                                          by the PBE.
+                                       */
+                                       goto done;
                                }
 
+                               memset(&send_evt, '\0', sizeof(IMMSV_EVT));
                                send_evt.info.imma.type =
                                    IMMA_EVT_ND2A_OI_CCB_COMPLETED_UC;
                                send_evt.info.imma.info.ccbCompl.ccbId = 0;
                                send_evt.info.imma.info.ccbCompl.immHandle =
                                    implHandle;
                                send_evt.info.imma.info.ccbCompl.implId =
-                                   arrSize;
+                                   1;
                                /* ^^Hack: Use implId to store objCount, see
                                   #1809.^^ This avoids having to change the
                                   protocol.
                                 */
+                               /* It will be always 1 for number of delete 
objects.
+                                * Cascade delete is done on PBE size, and only
+                                * parent DN is sent to PBE
+                                */
                                send_evt.info.imma.info.ccbCompl.invocation =
                                    continuationId;
 
@@ -8818,17 +8808,6 @@ static void immnd_evt_proc_rt_object_delete(IMMND_CB 
*cb, IMMND_EVT *evt,
                        implHandle =
                            m_IMMSV_PACK_HANDLE(pbe2BConn, cb->node_id);
 
-                       memset(&send_evt, '\0', sizeof(IMMSV_EVT));
-                       send_evt.type = IMMSV_EVT_TYPE_IMMA;
-
-                       send_evt.info.imma.type =
-                           IMMA_EVT_ND2A_OI_OBJ_DELETE_UC;
-                       send_evt.info.imma.info.objDelete.ccbId = 0;
-                       send_evt.info.imma.info.objDelete.adminOwnerId =
-                           continuationId;
-                       send_evt.info.imma.info.objDelete.immHandle =
-                           implHandle;
-
                        /*Fetch client node for Slave PBE */
                        immnd_client_node_get(cb, implHandle, &pbe_cl_node);
                        osafassert(pbe_cl_node);
@@ -8838,39 +8817,38 @@ static void immnd_evt_proc_rt_object_delete(IMMND_CB 
*cb, IMMND_EVT *evt,
                                goto done;
                        } else {
                                /* We have obtained handle & dest info for Slave
-                                  PBE. Iterate through objNameArray and send
-                                  delete upcalls to Slave PBE.
+                                  PBE. Send delete upcalls to Slave PBE.
                                */
-                               int ix = 0;
-                               for (; ix < arrSize && err == SA_AIS_OK; ++ix) {
-                                       send_evt.info.imma.info.objDelete
-                                           .objectName.size =
-                                           (SaUint32T)strlen(objNameArr[ix]) +
-                                           1;
-                                       send_evt.info.imma.info.objDelete
-                                           .objectName.buf = objNameArr[ix];
+                               memset(&send_evt, '\0', sizeof(IMMSV_EVT));
+                               send_evt.type = IMMSV_EVT_TYPE_IMMA;
 
-                                       TRACE_2(
-                                           "MAKING PBE-SLAVE PERSISTENT RT-OBJ 
DELETE upcalls");
-                                       if (immnd_mds_msg_send(
-                                               cb, NCSMDS_SVC_ID_IMMA_OI,
-                                               pbe_cl_node->agent_mds_dest,
-                                               &send_evt) !=
-                                           NCSCC_RC_SUCCESS) {
-                                               LOG_WA(
-                                                   "Upcall over MDS for 
persistent rt obj delete "
-                                                   "to Slave PBE failed!");
-                                               /* TODO: we could possibly
-                                                  revert the delete here an
-                                                  return TRY_AGAIN. We may have
-                                                  succeeded in sending some
-                                                  deletes, but since we did not
-                                                  send the completed, the PRTO
-                                                  deletes will not be commited
-                                                  by the PBE.
-                                                */
-                                               goto done;
-                                       }
+                               send_evt.info.imma.type = 
IMMA_EVT_ND2A_OI_OBJ_DELETE_UC;
+                               send_evt.info.imma.info.objDelete.ccbId = 0;
+                               send_evt.info.imma.info.objDelete.adminOwnerId =
+                                               continuationId;
+                               send_evt.info.imma.info.objDelete.immHandle = 
implHandle;
+                               
send_evt.info.imma.info.objDelete.objectName.size =
+                                               
evt->info.objDelete.objectName.size;
+                               
send_evt.info.imma.info.objDelete.objectName.buf =
+                                               
evt->info.objDelete.objectName.buf;
+
+                               TRACE_2("MAKING PBE-SLAVE PERSISTENT RT-OBJ 
DELETE upcalls");
+                               if (immnd_mds_msg_send(cb, 
NCSMDS_SVC_ID_IMMA_OI,
+                                                                       
pbe_cl_node->agent_mds_dest,
+                                                                       
&send_evt) !=
+                                                                               
        NCSCC_RC_SUCCESS) {
+                                       LOG_WA("Upcall over MDS for persistent 
rt obj delete "
+                                                       "to Slave PBE failed!");
+                                       /* TODO: we could possibly
+                                          revert the delete here an
+                                          return TRY_AGAIN. We may have
+                                          succeeded in sending some
+                                          deletes, but since we did not
+                                          send the completed, the PRTO
+                                          deletes will not be commited
+                                          by the PBE.
+                                        */
+                                       goto done;
                                }
                        }
                        implHandle = 0LL;
-- 
1.9.1


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to