- add create and set (if previous value matches) functions to KeyValue class
- add Consensus::MonitorTakeoverRequest() function for use by RDE to answer 
takeover requests
- add Consensus::CreateTakeoverRequest() - before a SC is promoted to active, 
it will
  create a takeover request in the KV store. An existing SC can reject the lock 
takeover
---
 src/osaf/consensus/consensus.cc | 435 ++++++++++++++++++++++++++++++++++------
 src/osaf/consensus/consensus.h  |  55 ++++-
 src/osaf/consensus/key_value.cc | 105 +++++++---
 src/osaf/consensus/key_value.h  |  19 +-
 4 files changed, 511 insertions(+), 103 deletions(-)

diff --git a/src/osaf/consensus/consensus.cc b/src/osaf/consensus/consensus.cc
index cc04f3518..f2245dd01 100644
--- a/src/osaf/consensus/consensus.cc
+++ b/src/osaf/consensus/consensus.cc
@@ -15,13 +15,17 @@
 #include "osaf/consensus/consensus.h"
 #include <unistd.h>
 #include <climits>
+#include <sstream>
 #include <thread>
 #include "base/conf.h"
 #include "base/getenv.h"
 #include "base/logtrace.h"
 #include "base/ncssysf_def.h"
 
-SaAisErrorT Consensus::PromoteThisNode() {
+const std::string Consensus::kTakeoverRequestKeyname = "takeover_request";
+
+SaAisErrorT Consensus::PromoteThisNode(const bool graceful_takeover,
+    const uint64_t cluster_size) {
   TRACE_ENTER();
   SaAisErrorT rc;
 
@@ -29,6 +33,10 @@ SaAisErrorT Consensus::PromoteThisNode() {
     return SA_AIS_OK;
   }
 
+  // check if there is an existing takeover requests, we cannot
+  // attempt to lock until that is complete
+  CheckForExistingTakeoverRequest();
+
   uint32_t retries = 0;
   rc = KeyValue::Lock(base::Conf::NodeName(), kLockTimeout);
   while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) {
@@ -39,33 +47,30 @@ SaAisErrorT Consensus::PromoteThisNode() {
   }
 
   if (rc == SA_AIS_ERR_EXIST) {
+    // there's a chance the lock has been released since the lock attempt
     // get the current active controller
-    std::string current_active("");
-    retries = 0;
-    rc = KeyValue::LockOwner(current_active);
-    while (rc != SA_AIS_OK && retries < kMaxRetry) {
-      ++retries;
-      std::this_thread::sleep_for(kSleepInterval);
-      rc = KeyValue::LockOwner(current_active);
-    }
-    if (rc != SA_AIS_OK) {
-      LOG_ER("Failed to get current lock owner. Will attempt to lock anyway");
+    std::string current_active = CurrentActive();
+
+    if (current_active.empty() == true) {
+      LOG_WA("Failed to get current lock owner. Will attempt to lock anyway");
     }
 
+    bool take_over_request_created = false;
     LOG_NO("Current active controller is %s", current_active.c_str());
 
-    // there's a chance the lock has been released since the lock attempt
     if (current_active.empty() == false) {
-      // remove current active controller's lock and fence it
-      retries = 0;
-      rc = KeyValue::Unlock(current_active);
-      while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) {
-        LOG_IN("Trying to unlock");
-        ++retries;
-        std::this_thread::sleep_for(kSleepInterval);
-        rc = KeyValue::Unlock(current_active);
+      if (graceful_takeover == true) {
+        rc = CreateTakeoverRequest(current_active, base::Conf::NodeName(),
+                                   cluster_size);
+        if (rc != SA_AIS_OK) {
+          LOG_WA("Takeover request failed (%d)", rc);
+          return rc;
+        }
+        take_over_request_created = true;
       }
 
+      // remove current active controller's lock and fence it
+      rc = Demote(current_active);
       if (rc == SA_AIS_OK) {
         FenceNode(current_active);
       } else {
@@ -82,6 +87,23 @@ SaAisErrorT Consensus::PromoteThisNode() {
       std::this_thread::sleep_for(kSleepInterval);
       rc = KeyValue::Lock(base::Conf::NodeName(), kLockTimeout);
     }
+
+    if (take_over_request_created == true) {
+      SaAisErrorT rc1;
+
+      // remove takeover request
+      rc1 = KeyValue::Erase(kTakeoverRequestKeyname);
+      retries = 0;
+      while (rc1 != SA_AIS_OK && retries < kMaxRetry) {
+        ++retries;
+        std::this_thread::sleep_for(kSleepInterval);
+        rc1 = KeyValue::Erase(kTakeoverRequestKeyname);
+      }
+
+      if (rc1 != SA_AIS_OK) {
+        LOG_WA("Could not remove takeover request");
+      }
+    }
   }
 
   if (rc == SA_AIS_OK) {
@@ -93,43 +115,23 @@ SaAisErrorT Consensus::PromoteThisNode() {
   return rc;
 }
 
-SaAisErrorT Consensus::Demote(const std::string& node = "") {
+SaAisErrorT Consensus::Demote(const std::string& node) {
   TRACE_ENTER();
   if (use_consensus_ == false) {
     return SA_AIS_OK;
   }
 
-  SaAisErrorT rc = SA_AIS_ERR_FAILED_OPERATION;
-  uint32_t retries = 0;
-
-  // check current active node
-  std::string current_active;
-  rc = KeyValue::LockOwner(current_active);
-  while (rc != SA_AIS_OK && retries < kMaxRetry) {
-    ++retries;
-    std::this_thread::sleep_for(kSleepInterval);
-    rc = KeyValue::LockOwner(current_active);
-  }
-
-  if (rc != SA_AIS_OK) {
-    LOG_ER("Failed to get lock owner");
-    return rc;
-  }
-
-  LOG_NO("Demoting %s as active controller", current_active.c_str());
+  osafassert(node.empty() == false);
 
-  if (node.empty() == false && node != current_active) {
-    // node is not the current active controller!
-    osafassert(false);
-  }
+  SaAisErrorT rc;
+  uint32_t retries = 0;
 
-  retries = 0;
-  rc = KeyValue::Unlock(current_active);
+  rc = KeyValue::Unlock(node);
   while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) {
     LOG_IN("Trying to unlock");
     ++retries;
     std::this_thread::sleep_for(kSleepInterval);
-    rc = KeyValue::Unlock(current_active);
+    rc = KeyValue::Unlock(node);
   }
 
   if (rc != SA_AIS_OK) {
@@ -143,7 +145,17 @@ SaAisErrorT Consensus::Demote(const std::string& node = 
"") {
 
 SaAisErrorT Consensus::DemoteCurrentActive() {
   TRACE_ENTER();
-  return Demote();
+
+  // check current active node
+  std::string current_active = CurrentActive();
+  if (current_active.empty() == true) {
+    LOG_ER("Failed to get lock owner");
+    return SA_AIS_ERR_FAILED_OPERATION;
+  }
+
+  LOG_NO("Demoting %s as active controller", current_active.c_str());
+
+  return Demote(current_active);
 }
 
 SaAisErrorT Consensus::DemoteThisNode() {
@@ -151,9 +163,7 @@ SaAisErrorT Consensus::DemoteThisNode() {
   return Demote(base::Conf::NodeName());
 }
 
-bool Consensus::IsEnabled() const {
-  return use_consensus_;
-}
+bool Consensus::IsEnabled() const { return use_consensus_; }
 
 bool Consensus::IsWritable() const {
   TRACE_ENTER();
@@ -178,9 +188,7 @@ bool Consensus::IsWritable() const {
   }
 }
 
-bool Consensus::IsRemoteFencingEnabled() const {
-  return use_remote_fencing_;
-}
+bool Consensus::IsRemoteFencingEnabled() const { return use_remote_fencing_; }
 
 std::string Consensus::CurrentActive() const {
   TRACE_ENTER();
@@ -188,7 +196,7 @@ std::string Consensus::CurrentActive() const {
     return "";
   }
 
-  SaAisErrorT rc = SA_AIS_ERR_FAILED_OPERATION;
+  SaAisErrorT rc;
   uint32_t retries = 0;
   std::string owner;
 
@@ -212,7 +220,7 @@ Consensus::Consensus() {
 
   uint32_t split_brain_enable = base::GetEnv("FMS_SPLIT_BRAIN_PREVENTION", 0);
   std::string kv_store_cmd = base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
-  uint32_t use_remote_fencing = base::GetEnv("FMS_USE_REMOTE_FENCING" , 0);
+  uint32_t use_remote_fencing = base::GetEnv("FMS_USE_REMOTE_FENCING", 0);
 
   if (split_brain_enable == 1 && kv_store_cmd.empty() == false) {
     use_consensus_ = true;
@@ -228,16 +236,14 @@ Consensus::Consensus() {
   base::Conf::InitNodeName();
 }
 
-Consensus::~Consensus() {
-}
+Consensus::~Consensus() {}
 
 bool Consensus::FenceNode(const std::string& node) {
   if (use_remote_fencing_ == true) {
     LOG_WA("Fencing remote node %s", node.c_str());
     // @todo currently passing UINT_MAX as node ID, since
     // we can't always obtain a valid node ID?
-    opensaf_reboot(UINT_MAX, node.c_str(),
-      "Fencing remote node");
+    opensaf_reboot(UINT_MAX, node.c_str(), "Fencing remote node");
 
     return true;
   } else {
@@ -247,7 +253,7 @@ bool Consensus::FenceNode(const std::string& node) {
 }
 
 void Consensus::MonitorLock(ConsensusCallback callback,
-  const uint32_t user_defined) {
+                            const uint32_t user_defined) {
   TRACE_ENTER();
   if (use_consensus_ == false) {
     return;
@@ -255,3 +261,312 @@ void Consensus::MonitorLock(ConsensusCallback callback,
 
   KeyValue::WatchLock(callback, user_defined);
 }
+
+void Consensus::MonitorTakeoverRequest(ConsensusCallback callback,
+                                       const uint32_t user_defined) {
+  TRACE_ENTER();
+  if (use_consensus_ == false) {
+    return;
+  }
+
+  KeyValue::Watch(kTakeoverRequestKeyname, callback, user_defined);
+}
+
+void Consensus::CheckForExistingTakeoverRequest() {
+  SaAisErrorT rc;
+  std::vector<std::string> tokens;
+  rc = ReadTakeoverRequest(tokens);
+
+  if (rc != SA_AIS_OK) {
+    return;
+  }
+
+  // get expiration
+  const uint64_t expiration_timestamp = strtoull(
+      tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(),
+      0, 10);
+
+  // wait until expiration is over
+  int64_t expiration = expiration_timestamp - CurrentTime();
+  if (expiration > 0 && expiration <= kTakeoverValidTime) {
+    // @todo check if the takeover request has been deleted already?
+    LOG_NO("A takeover request is in progress"
+           " (expiring in %" PRId64 " seconds)",
+           expiration);
+    std::chrono::seconds sleep_duration(expiration);
+    std::this_thread::sleep_for(sleep_duration);
+  } else {
+    LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp);
+  }
+}
+
+SaAisErrorT Consensus::CreateTakeoverRequest(const std::string& current_owner,
+                                             const std::string& proposed_owner,
+                                             const uint64_t cluster_size) {
+  TRACE_ENTER();
+
+  // Format of takeover request:
+  // "expiration_time<space>current_owner<space>proposed_owner<space>
+  // proposed_owner_cluster_size<space>status"
+  // status := [UNDEFINED, NEW, REJECTED, ACCEPTED]
+
+  std::string takeover_request;
+  // request to expire in 20 seconds
+  uint64_t timestamp = CurrentTime() + kTakeoverValidTime;
+
+  takeover_request =
+      std::to_string(timestamp) + " " + current_owner + " " +
+      base::Conf::NodeName() + " " + std::to_string(cluster_size) + " " +
+      TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)];
+
+  TRACE("Takeover request: \"%s\"", takeover_request.c_str());
+
+  SaAisErrorT rc;
+  uint32_t retries = 0;
+  rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request);
+  while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) {
+    ++retries;
+    std::this_thread::sleep_for(kSleepInterval);
+    rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request);
+  }
+
+  if (rc == SA_AIS_ERR_EXIST) {
+    LOG_NO("Existing takeover request found");
+
+    // retrieve takeover request
+    std::vector<std::string> tokens;
+    retries = 0;
+    rc = ReadTakeoverRequest(tokens);
+    while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) {
+      ++retries;
+      std::this_thread::sleep_for(kSleepInterval);
+      rc = ReadTakeoverRequest(tokens);
+    }
+
+    if (rc == SA_AIS_OK) {
+      // get expiration
+      const uint64_t expiration_timestamp = strtoull(
+        tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(),
+        0, 10);
+
+      // wait until expiration is over
+      int64_t expiration = expiration_timestamp - CurrentTime();
+      if (expiration > 0 && expiration <= kTakeoverValidTime) {
+        LOG_NO("A takeover request is in progress"
+               " (expiring in %" PRId64 " seconds)",
+               expiration);
+        std::chrono::seconds sleep_duration(expiration);
+        std::this_thread::sleep_for(sleep_duration);
+      } else {
+        LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp);
+      }
+    }  // else remove it anyway
+
+    LOG_NO("Remove expired takeover request");
+
+    // remove expired request
+    retries = 0;
+    rc = KeyValue::Erase(kTakeoverRequestKeyname);
+    while (rc != SA_AIS_OK && retries < kMaxRetry) {
+      ++retries;
+      std::this_thread::sleep_for(kSleepInterval);
+      rc = KeyValue::Erase(kTakeoverRequestKeyname);
+    }
+
+    if (rc == SA_AIS_OK) {
+      return CreateTakeoverRequest(current_owner, proposed_owner, 
cluster_size);
+    } else {
+      LOG_ER("Could not remove existing takeover request");
+      return SA_AIS_ERR_EXIST;
+    }
+  }
+
+  // wait up to 10s for request to be answered
+  retries = 0;
+  while (retries < kMaxTakeoverRetry) {
+    std::vector<std::string> tokens;
+    if (ReadTakeoverRequest(tokens) == SA_AIS_OK) {
+      const std::string state =
+          tokens[static_cast<std::uint8_t>(TakeoverElements::STATE)];
+      const std::string _proposed_owner =
+          tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_OWNER)];
+
+      if (proposed_owner != _proposed_owner) {
+        LOG_ER("Takeover request was not created by us! (%s)",
+               _proposed_owner.c_str());
+        rc = SA_AIS_ERR_EXIST;
+        break;
+      }
+
+      if (state == TakeoverStateStr[static_cast<std::uint8_t>(
+                       TakeoverState::REJECTED)]) {
+        LOG_NO("Takeover request rejected");
+        rc = SA_AIS_ERR_EXIST;
+        break;
+      } else if (state == TakeoverStateStr[static_cast<std::uint8_t>(
+                              TakeoverState::ACCEPTED)]) {
+        LOG_NO("Takeover request accepted");
+        rc = SA_AIS_OK;
+        break;
+      } else if (state == TakeoverStateStr[static_cast<std::uint8_t>(
+                              TakeoverState::NEW)]) {
+        TRACE("Waiting for response to takeover request");
+        // set result to OK, in case we do not get a reply
+        // within the allocated period. This will allow the lock to
+        // removed by this node. Note: do not break out of the loop here!
+        rc = SA_AIS_OK;
+      }
+    }
+    ++retries;
+    std::this_thread::sleep_for(kSleepInterval);
+  }
+
+  LOG_NO("Result: %d", rc);
+  return rc;
+}
+
+SaAisErrorT Consensus::WriteTakeoverResult(
+    const std::string& timestamp, const std::string& current_owner,
+    const std::string& proposed_owner, const std::string& 
proposed_cluster_size,
+    const TakeoverState result) {
+  TRACE_ENTER();
+
+  const std::string takeover_request =
+      timestamp + " " + current_owner + " " + proposed_owner + " " +
+      proposed_cluster_size + " " +
+      TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)];
+
+  const std::string takeover_result =
+      timestamp + " " + current_owner + " " + proposed_owner + " " +
+      proposed_cluster_size + " " +
+      TakeoverStateStr[static_cast<std::uint8_t>(result)];
+
+  LOG_NO("TakeoverResult: %s", takeover_result.c_str());
+
+  SaAisErrorT rc;
+
+  // previous value must match
+  rc =
+      KeyValue::Set(kTakeoverRequestKeyname, takeover_result, 
takeover_request);
+
+  return rc;
+}
+
+SaAisErrorT Consensus::ReadTakeoverRequest(std::vector<std::string>& tokens) {
+  TRACE_ENTER();
+
+  std::string request;
+  SaAisErrorT rc;
+
+  rc = KeyValue::Get(kTakeoverRequestKeyname, request);
+  if (rc != SA_AIS_OK) {
+    // it doesn't always exist, don't log an error
+    LOG_NO("Could not read takeover request (%d)", rc);
+    return SA_AIS_ERR_FAILED_OPERATION;
+  }
+
+  if (request.empty() == true) {
+    // on node shutdown, this could be empty
+    return SA_AIS_ERR_UNAVAILABLE;
+  }
+
+  Split(request, tokens);
+  if (tokens.size() != 5) {
+    LOG_ER("Invalid takeover request");
+    return SA_AIS_ERR_LIBRARY;
+  }
+
+  return SA_AIS_OK;
+}
+
+Consensus::TakeoverState Consensus::HandleTakeoverRequest(
+    const uint64_t cluster_size) {
+  TRACE_ENTER();
+
+  if (use_consensus_ == false) {
+    return TakeoverState::UNDEFINED;
+  }
+
+  SaAisErrorT rc;
+  uint32_t retries = 0;
+  std::vector<std::string> tokens;
+
+  // get request from KV store
+  rc = ReadTakeoverRequest(tokens);
+  while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) {
+    ++retries;
+    std::this_thread::sleep_for(kSleepInterval);
+    rc = ReadTakeoverRequest(tokens);
+  }
+
+  if (rc != SA_AIS_OK) {
+    return TakeoverState::UNDEFINED;
+  }
+
+  // request is a space delimited string with 5 elements
+  osafassert(tokens.size() == 5);
+
+  // check the owner is this node
+  if (tokens[1] != base::Conf::NodeName()) {
+    LOG_ER("We do not own the lock. Ignoring takeover request");
+    return TakeoverState::UNDEFINED;
+  }
+
+  // expiration timestamp
+  const uint64_t expiration = strtoull(
+      tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(), 
0,
+      10);
+
+  // size of the other network partition
+  const uint64_t proposed_cluster_size = strtoull(
+      
tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_NETWORK_SIZE)]
+          .c_str(),
+      0, 10);
+
+  LOG_NO("Other network size: %" PRIu64 ", our network size: %" PRIu64,
+         proposed_cluster_size, cluster_size);
+
+  TakeoverState result;
+  if (CurrentTime() <= expiration && proposed_cluster_size > cluster_size) {
+    result = TakeoverState::ACCEPTED;
+  } else {
+    result = TakeoverState::REJECTED;
+  }
+
+  rc = WriteTakeoverResult(
+      tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)],
+      tokens[static_cast<std::uint8_t>(TakeoverElements::CURRENT_OWNER)],
+      tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_OWNER)],
+      tokens[static_cast<std::uint8_t>(
+          TakeoverElements::PROPOSED_NETWORK_SIZE)],
+      result);
+  if (rc != SA_AIS_OK) {
+    LOG_ER("Unable to write takeover result (%d)", rc);
+    return TakeoverState::UNDEFINED;
+  }
+
+  return result;
+}
+
+// separate space delimited elements in a string
+void Consensus::Split(const std::string& str,
+                      std::vector<std::string>& tokens) const {
+  std::stringstream stream(str);
+  std::string buffer;
+
+  while (stream >> buffer) {
+    tokens.push_back(buffer);
+  }
+}
+
+// seconds after epoch
+uint64_t Consensus::CurrentTime() const {
+  auto now = std::chrono::system_clock::now();
+
+  // seconds since epoch
+  auto timestamp =
+      std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
+          .count();
+
+  return timestamp;
+}
diff --git a/src/osaf/consensus/consensus.h b/src/osaf/consensus/consensus.h
index 9a8aa7e9b..abd17612c 100644
--- a/src/osaf/consensus/consensus.h
+++ b/src/osaf/consensus/consensus.h
@@ -17,14 +17,16 @@
 
 #include <chrono>
 #include <string>
-#include "saAis.h"
+#include <vector>
 #include "base/macros.h"
 #include "osaf/consensus/key_value.h"
+#include "saAis.h"
 
 class Consensus {
  public:
   // Set active controller to this node
-  SaAisErrorT PromoteThisNode();
+  SaAisErrorT PromoteThisNode(const bool graceful_takeover,
+    const uint64_t cluster_size);
 
   // Clear current active controller by releasing lock
   SaAisErrorT DemoteCurrentActive();
@@ -40,6 +42,9 @@ class Consensus {
   // in the callback
   void MonitorLock(ConsensusCallback callback, const uint32_t user_defined);
 
+  void MonitorTakeoverRequest(ConsensusCallback callback,
+                              const uint32_t user_defined);
+
   // Is consensus service enabled?
   bool IsEnabled() const;
 
@@ -52,17 +57,59 @@ class Consensus {
   Consensus();
   virtual ~Consensus();
 
+  static const std::string kTakeoverRequestKeyname;
+
+  enum class TakeoverState : std::uint8_t {
+    UNDEFINED = 0,
+    NEW = 1,
+    ACCEPTED = 2,
+    REJECTED = 3,
+  };
+
+  enum class TakeoverElements : std::uint8_t {
+    TIMESTAMP = 0,
+    CURRENT_OWNER = 1,
+    PROPOSED_OWNER = 2,
+    PROPOSED_NETWORK_SIZE = 3,
+    STATE = 4
+  };
+
+  const std::string TakeoverStateStr[4] = {"UNDEFINED", "NEW", "ACCEPTED",
+                                           "REJECTED"};
+
+  TakeoverState HandleTakeoverRequest(const uint64_t cluster_size);
+
  private:
   bool use_consensus_ = false;
   bool use_remote_fencing_ = false;
   const std::string kTestKeyname = "opensaf_write_test";
   const std::chrono::milliseconds kSleepInterval =
-    std::chrono::milliseconds(100);  // in ms
+    std::chrono::milliseconds(500);  // in ms
   static constexpr uint32_t kLockTimeout = 0;  // lock is persistent by default
-  static constexpr uint32_t kMaxRetry = 600;
+  static constexpr uint32_t kMaxTakeoverRetry = 20;
+  static constexpr uint32_t kMaxRetry = 60;
+  static constexpr uint32_t kTakeoverValidTime = 20;  // in seconds
+
+  void CheckForExistingTakeoverRequest();
+
+  SaAisErrorT CreateTakeoverRequest(const std::string& current_owner,
+                                    const std::string& proposed_owner,
+                                    const uint64_t cluster_size);
+
+  SaAisErrorT ReadTakeoverRequest(std::vector<std::string>& tokens);
+
+  SaAisErrorT WriteTakeoverResult(const std::string& timestamp,
+                                  const std::string& current_owner,
+                                  const std::string& proposed_owner,
+                                  const std::string& proposed_cluster_size,
+                                  const TakeoverState result);
+
   SaAisErrorT Demote(const std::string& node);
   bool FenceNode(const std::string& node);
 
+  void Split(const std::string& str, std::vector<std::string>& tokens) const;
+  uint64_t CurrentTime() const;
+
   DELETE_COPY_AND_MOVE_OPERATORS(Consensus);
 };
 
diff --git a/src/osaf/consensus/key_value.cc b/src/osaf/consensus/key_value.cc
index dbf07520b..80950e7cb 100644
--- a/src/osaf/consensus/key_value.cc
+++ b/src/osaf/consensus/key_value.cc
@@ -45,9 +45,9 @@ int KeyValue::Execute(const std::string& command, 
std::string& output) {
 SaAisErrorT KeyValue::Get(const std::string& key, std::string& value) {
   TRACE_ENTER();
 
-  const std::string kv_store_cmd = base::GetEnv(
-    "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
-  const std::string command(kv_store_cmd + " get " + key);
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string command(kv_store_cmd + " get \"" + key + "\"");
   int rc = KeyValue::Execute(command, value);
   TRACE("Read '%s'", value.c_str());
 
@@ -61,9 +61,10 @@ SaAisErrorT KeyValue::Get(const std::string& key, 
std::string& value) {
 SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value) {
   TRACE_ENTER();
 
-  const std::string kv_store_cmd = base::GetEnv(
-    "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
-  const std::string command(kv_store_cmd + " set " + key + " " + value);
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string command(kv_store_cmd + " set \"" + key + "\" \"" + value +
+                            "\"");
   std::string output;
   int rc = KeyValue::Execute(command, output);
 
@@ -74,12 +75,47 @@ SaAisErrorT KeyValue::Set(const std::string& key, const 
std::string& value) {
   }
 }
 
+SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value,
+                          const std::string& prev_value) {
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string command(kv_store_cmd + " set_if_prev \"" + key + "\" \"" +
+                            value + "\" \"" + prev_value + "\"");
+  std::string output;
+  int rc = KeyValue::Execute(command, output);
+
+  if (rc == 0) {
+    return SA_AIS_OK;
+  } else {
+    return SA_AIS_ERR_FAILED_OPERATION;
+  }
+}
+
+SaAisErrorT KeyValue::Create(const std::string& key, const std::string& value) 
{
+  TRACE_ENTER();
+
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string command(kv_store_cmd + " create \"" + key + "\" \"" +
+                            value + "\"");
+  std::string output;
+  int rc = KeyValue::Execute(command, output);
+
+  if (rc == 0) {
+    return SA_AIS_OK;
+  } else if (rc == 1) {
+    return SA_AIS_ERR_EXIST;
+  } else {
+    return SA_AIS_ERR_FAILED_OPERATION;
+  }
+}
+
 SaAisErrorT KeyValue::Erase(const std::string& key) {
   TRACE_ENTER();
 
-  const std::string kv_store_cmd = base::GetEnv(
-    "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
-  const std::string command(kv_store_cmd + " erase " + key);
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string command(kv_store_cmd + " erase \"" + key + "\"");
   std::string output;
   int rc = KeyValue::Execute(command, output);
 
@@ -91,13 +127,13 @@ SaAisErrorT KeyValue::Erase(const std::string& key) {
 }
 
 SaAisErrorT KeyValue::Lock(const std::string& owner,
-                         const unsigned int timeout) {
+                           const unsigned int timeout) {
   TRACE_ENTER();
 
-  const std::string kv_store_cmd = base::GetEnv(
-    "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
-  const std::string command(kv_store_cmd + " lock " + owner + " " +
-    std::to_string(timeout));
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string command(kv_store_cmd + " lock \"" + owner + "\" " +
+                            std::to_string(timeout));
   std::string output;
   int rc = KeyValue::Execute(command, output);
 
@@ -105,8 +141,10 @@ SaAisErrorT KeyValue::Lock(const std::string& owner,
     return SA_AIS_OK;
   } else if (rc == 1) {
     // already locked
+    LOG_NO("Locked failed: %s", output.c_str());
     return SA_AIS_ERR_EXIST;
   } else {
+    LOG_NO("Locked failed: %s", output.c_str());
     return SA_AIS_ERR_TRY_AGAIN;
   }
 }
@@ -114,15 +152,16 @@ SaAisErrorT KeyValue::Lock(const std::string& owner,
 SaAisErrorT KeyValue::Unlock(const std::string& owner) {
   TRACE_ENTER();
 
-  const std::string kv_store_cmd = base::GetEnv(
-    "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
-  const std::string command(kv_store_cmd + " unlock " + owner);
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string command(kv_store_cmd + " unlock \"" + owner + "\"");
   std::string output;
   int rc = Execute(command, output);
 
   if (rc == 0) {
     return SA_AIS_OK;
   } else if (rc == 1) {
+    LOG_NO("Unlock failed: %s", output.c_str());
     LOG_ER("Lock is owned by another node");
     return SA_AIS_ERR_INVALID_PARAM;
   } else {
@@ -133,8 +172,8 @@ SaAisErrorT KeyValue::Unlock(const std::string& owner) {
 SaAisErrorT KeyValue::LockOwner(std::string& owner) {
   TRACE_ENTER();
 
-  const std::string kv_store_cmd = base::GetEnv(
-    "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
   const std::string command(kv_store_cmd + " lock_owner");
   std::string output;
   int rc = KeyValue::Execute(command, output);
@@ -145,23 +184,24 @@ SaAisErrorT KeyValue::LockOwner(std::string& owner) {
     return SA_AIS_OK;
   }
 
+  // put output in owner, for debugging purposes
+  owner = output;
   return SA_AIS_ERR_FAILED_OPERATION;
 }
 
 namespace {
 
 static constexpr std::chrono::milliseconds kSleepInterval =
-  std::chrono::milliseconds(100);  // in ms
+    std::chrono::milliseconds(100);  // in ms
 static constexpr uint32_t kMaxRetry = 100;
 
-void WatchKeyFunction(const std::string& key,
-  const ConsensusCallback& callback,
-  const uint32_t user_defined) {
+void WatchKeyFunction(const std::string& key, const ConsensusCallback& 
callback,
+                      const uint32_t user_defined) {
   TRACE_ENTER();
 
-  const std::string kv_store_cmd = base::GetEnv(
-    "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
-  const std::string command(kv_store_cmd + " watch " + key);
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string command(kv_store_cmd + " watch \"" + key + "\"");
   std::string value;
   uint32_t retries = 0;
   int rc;
@@ -183,11 +223,11 @@ void WatchKeyFunction(const std::string& key,
 }
 
 void WatchLockFunction(const ConsensusCallback& callback,
-  const uint32_t user_defined) {
+                       const uint32_t user_defined) {
   TRACE_ENTER();
 
-  const std::string kv_store_cmd = base::GetEnv(
-    "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+  const std::string kv_store_cmd =
+      base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
   const std::string command(kv_store_cmd + " watch_lock");
   std::string value;
   uint32_t retries = 0;
@@ -211,16 +251,15 @@ void WatchLockFunction(const ConsensusCallback& callback,
 
 }  // namespace
 
-void KeyValue::Watch(const std::string& key,
-  const ConsensusCallback callback,
-  const uint32_t user_defined) {
+void KeyValue::Watch(const std::string& key, const ConsensusCallback callback,
+                     const uint32_t user_defined) {
   std::thread t(WatchKeyFunction, key, callback, user_defined);
   t.detach();
   return;
 }
 
 void KeyValue::WatchLock(const ConsensusCallback callback,
-  const uint32_t user_defined) {
+                         const uint32_t user_defined) {
   std::thread t(WatchLockFunction, callback, user_defined);
   t.detach();
   return;
diff --git a/src/osaf/consensus/key_value.h b/src/osaf/consensus/key_value.h
index 853f303ff..b003629c0 100644
--- a/src/osaf/consensus/key_value.h
+++ b/src/osaf/consensus/key_value.h
@@ -20,9 +20,9 @@
 #include <thread>
 #include "saAis.h"
 
-typedef std::function<void(const std::string& key,
-  const std::string& new_value,
-  const uint32_t user_defined)> ConsensusCallback;
+typedef std::function<void(const std::string& key, const std::string& 
new_value,
+                           const uint32_t user_defined)>
+    ConsensusCallback;
 
 class KeyValue {
  public:
@@ -32,13 +32,20 @@ class KeyValue {
   // Set key to value
   static SaAisErrorT Set(const std::string& key, const std::string& value);
 
+  // Set key to value only if prev value matches
+  static SaAisErrorT Set(const std::string& key, const std::string& value,
+                         const std::string& prev_value);
+
+  // Create key, and set to value. Fails if key already exists.
+  static SaAisErrorT Create(const std::string& key, const std::string& value);
+
   // Erase key
   static SaAisErrorT Erase(const std::string& key);
 
   // Obtain lock, default timeout is 0 seconds (indefinite). If lock
   // is called when already locked, the timeout is extended
   static SaAisErrorT Lock(const std::string& owner,
-    const unsigned int timeout = 0);
+                          const unsigned int timeout = 0);
 
   // Release lock
   static SaAisErrorT Unlock(const std::string& owner);
@@ -48,11 +55,11 @@ class KeyValue {
 
   // starts a thread to watch key and call callback if values changes
   static void Watch(const std::string& key, ConsensusCallback callback,
-    const uint32_t user_defined);
+                    const uint32_t user_defined);
 
   // starts a thread to watch the lock and call callback if is modified
   static void WatchLock(ConsensusCallback callback,
-    const uint32_t user_defined);
+                        const uint32_t user_defined);
 
   // internal use
   static int Execute(const std::string& command, std::string& output);
-- 
2.14.1


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to