* update create() in the plugins to include a timeout parameter
* remove timestamp from the takeover request and utilise the
  built-in timeout functionality in the KV store
---
 src/osaf/consensus/consensus.cc          | 165 ++++++++++++-------------------
 src/osaf/consensus/consensus.h           |  21 ++--
 src/osaf/consensus/key_value.cc          |   5 +-
 src/osaf/consensus/key_value.h           |   3 +-
 src/osaf/consensus/plugins/etcd.plugin   |  12 ++-
 src/osaf/consensus/plugins/etcd3.plugin  |  39 ++++++--
 src/osaf/consensus/plugins/sample.plugin |  10 +-
 7 files changed, 124 insertions(+), 131 deletions(-)

diff --git a/src/osaf/consensus/consensus.cc b/src/osaf/consensus/consensus.cc
index f2245dd01..a6248136d 100644
--- a/src/osaf/consensus/consensus.cc
+++ b/src/osaf/consensus/consensus.cc
@@ -25,7 +25,7 @@
 const std::string Consensus::kTakeoverRequestKeyname = "takeover_request";
 
 SaAisErrorT Consensus::PromoteThisNode(const bool graceful_takeover,
-    const uint64_t cluster_size) {
+                                       const uint64_t cluster_size) {
   TRACE_ENTER();
   SaAisErrorT rc;
 
@@ -89,18 +89,8 @@ SaAisErrorT Consensus::PromoteThisNode(const bool 
graceful_takeover,
     }
 
     if (take_over_request_created == true) {
-      SaAisErrorT rc1;
-
-      // remove takeover request
-      rc1 = KeyValue::Erase(kTakeoverRequestKeyname);
-      retries = 0;
-      while (rc1 != SA_AIS_OK && retries < kMaxRetry) {
-        ++retries;
-        std::this_thread::sleep_for(kSleepInterval);
-        rc1 = KeyValue::Erase(kTakeoverRequestKeyname);
-      }
-
-      if (rc1 != SA_AIS_OK) {
+      rc = RemoveTakeoverRequest();
+      if (rc != SA_AIS_OK) {
         LOG_WA("Could not remove takeover request");
       }
     }
@@ -115,6 +105,23 @@ SaAisErrorT Consensus::PromoteThisNode(const bool 
graceful_takeover,
   return rc;
 }
 
+SaAisErrorT Consensus::RemoveTakeoverRequest() {
+  TRACE_ENTER();
+  SaAisErrorT rc;
+  uint32_t retries = 0;
+
+  // remove takeover request
+  rc = KeyValue::Erase(kTakeoverRequestKeyname);
+  retries = 0;
+  while (rc != SA_AIS_OK && retries < kMaxRetry) {
+    ++retries;
+    std::this_thread::sleep_for(kSleepInterval);
+    rc = KeyValue::Erase(kTakeoverRequestKeyname);
+  }
+
+  return rc;
+}
+
 SaAisErrorT Consensus::Demote(const std::string& node) {
   TRACE_ENTER();
   if (use_consensus_ == false) {
@@ -273,6 +280,8 @@ void Consensus::MonitorTakeoverRequest(ConsensusCallback 
callback,
 }
 
 void Consensus::CheckForExistingTakeoverRequest() {
+  TRACE_ENTER();
+
   SaAisErrorT rc;
   std::vector<std::string> tokens;
   rc = ReadTakeoverRequest(tokens);
@@ -281,22 +290,17 @@ void Consensus::CheckForExistingTakeoverRequest() {
     return;
   }
 
-  // get expiration
-  const uint64_t expiration_timestamp = strtoull(
-      tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(),
-      0, 10);
+  LOG_NO("A takeover request is in progress");
 
-  // wait until expiration is over
-  int64_t expiration = expiration_timestamp - CurrentTime();
-  if (expiration > 0 && expiration <= kTakeoverValidTime) {
-    // @todo check if the takeover request has been deleted already?
-    LOG_NO("A takeover request is in progress"
-           " (expiring in %" PRId64 " seconds)",
-           expiration);
-    std::chrono::seconds sleep_duration(expiration);
-    std::this_thread::sleep_for(sleep_duration);
-  } else {
-    LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp);
+  uint32_t retries = 0;
+  // wait up to approximately 10 seconds, or until the takeover request is gone
+  rc = ReadTakeoverRequest(tokens);
+  while (rc == SA_AIS_OK &&
+         retries < kMaxTakeoverRetry) {
+    ++retries;
+    TRACE("Takeover request still present");
+    std::this_thread::sleep_for(kSleepInterval);
+    rc = ReadTakeoverRequest(tokens);
   }
 }
 
@@ -306,24 +310,23 @@ SaAisErrorT Consensus::CreateTakeoverRequest(const 
std::string& current_owner,
   TRACE_ENTER();
 
   // Format of takeover request:
-  // "expiration_time<space>current_owner<space>proposed_owner<space>
+  // "current_owner<space>proposed_owner<space>
   // proposed_owner_cluster_size<space>status"
   // status := [UNDEFINED, NEW, REJECTED, ACCEPTED]
 
   std::string takeover_request;
-  // request to expire in 20 seconds
-  uint64_t timestamp = CurrentTime() + kTakeoverValidTime;
 
   takeover_request =
-      std::to_string(timestamp) + " " + current_owner + " " +
-      base::Conf::NodeName() + " " + std::to_string(cluster_size) + " " +
+      current_owner + " " + base::Conf::NodeName() + " " +
+      std::to_string(cluster_size) + " " +
       TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)];
 
   TRACE("Takeover request: \"%s\"", takeover_request.c_str());
 
   SaAisErrorT rc;
   uint32_t retries = 0;
-  rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request);
+  rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request,
+                        kTakeoverValidTime);
   while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) {
     ++retries;
     std::this_thread::sleep_for(kSleepInterval);
@@ -336,49 +339,29 @@ SaAisErrorT Consensus::CreateTakeoverRequest(const 
std::string& current_owner,
     // retrieve takeover request
     std::vector<std::string> tokens;
     retries = 0;
+    // wait up to approximately 10 seconds, or until the takeover request is
+    // gone
     rc = ReadTakeoverRequest(tokens);
-    while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) {
+    while (rc == SA_AIS_OK &&
+           retries < kMaxTakeoverRetry) {
       ++retries;
+      TRACE("Takeover request still present");
       std::this_thread::sleep_for(kSleepInterval);
       rc = ReadTakeoverRequest(tokens);
     }
 
     if (rc == SA_AIS_OK) {
-      // get expiration
-      const uint64_t expiration_timestamp = strtoull(
-        tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(),
-        0, 10);
-
-      // wait until expiration is over
-      int64_t expiration = expiration_timestamp - CurrentTime();
-      if (expiration > 0 && expiration <= kTakeoverValidTime) {
-        LOG_NO("A takeover request is in progress"
-               " (expiring in %" PRId64 " seconds)",
-               expiration);
-        std::chrono::seconds sleep_duration(expiration);
-        std::this_thread::sleep_for(sleep_duration);
-      } else {
-        LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp);
-      }
-    }  // else remove it anyway
-
-    LOG_NO("Remove expired takeover request");
-
-    // remove expired request
-    retries = 0;
-    rc = KeyValue::Erase(kTakeoverRequestKeyname);
-    while (rc != SA_AIS_OK && retries < kMaxRetry) {
-      ++retries;
-      std::this_thread::sleep_for(kSleepInterval);
+      // still there? We need to forcibly remove it
+      retries = 0;
       rc = KeyValue::Erase(kTakeoverRequestKeyname);
+      while (rc != SA_AIS_OK && retries < kMaxRetry) {
+        rc = KeyValue::Erase(kTakeoverRequestKeyname);
+      }
     }
 
-    if (rc == SA_AIS_OK) {
-      return CreateTakeoverRequest(current_owner, proposed_owner, 
cluster_size);
-    } else {
-      LOG_ER("Could not remove existing takeover request");
-      return SA_AIS_ERR_EXIST;
-    }
+    LOG_NO("Takeover request expired or removed");
+
+    return CreateTakeoverRequest(current_owner, proposed_owner, cluster_size);
   }
 
   // wait up to 10s for request to be answered
@@ -421,24 +404,21 @@ SaAisErrorT Consensus::CreateTakeoverRequest(const 
std::string& current_owner,
     std::this_thread::sleep_for(kSleepInterval);
   }
 
-  LOG_NO("Result: %d", rc);
+  TRACE("Result: %d", rc);
   return rc;
 }
 
 SaAisErrorT Consensus::WriteTakeoverResult(
-    const std::string& timestamp, const std::string& current_owner,
-    const std::string& proposed_owner, const std::string& 
proposed_cluster_size,
-    const TakeoverState result) {
+    const std::string& current_owner, const std::string& proposed_owner,
+    const std::string& proposed_cluster_size, const TakeoverState result) {
   TRACE_ENTER();
 
   const std::string takeover_request =
-      timestamp + " " + current_owner + " " + proposed_owner + " " +
-      proposed_cluster_size + " " +
+      current_owner + " " + proposed_owner + " " + proposed_cluster_size + " " 
+
       TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)];
 
   const std::string takeover_result =
-      timestamp + " " + current_owner + " " + proposed_owner + " " +
-      proposed_cluster_size + " " +
+      current_owner + " " + proposed_owner + " " + proposed_cluster_size + " " 
+
       TakeoverStateStr[static_cast<std::uint8_t>(result)];
 
   LOG_NO("TakeoverResult: %s", takeover_result.c_str());
@@ -461,7 +441,7 @@ SaAisErrorT 
Consensus::ReadTakeoverRequest(std::vector<std::string>& tokens) {
   rc = KeyValue::Get(kTakeoverRequestKeyname, request);
   if (rc != SA_AIS_OK) {
     // it doesn't always exist, don't log an error
-    LOG_NO("Could not read takeover request (%d)", rc);
+    TRACE("Could not read takeover request (%d)", rc);
     return SA_AIS_ERR_FAILED_OPERATION;
   }
 
@@ -470,12 +450,14 @@ SaAisErrorT 
Consensus::ReadTakeoverRequest(std::vector<std::string>& tokens) {
     return SA_AIS_ERR_UNAVAILABLE;
   }
 
+  tokens.clear();
   Split(request, tokens);
-  if (tokens.size() != 5) {
-    LOG_ER("Invalid takeover request");
+  if (tokens.size() != 4) {
+    LOG_ER("Invalid takeover request: '%s'", request.c_str());
     return SA_AIS_ERR_LIBRARY;
   }
 
+  TRACE("Found '%s'", request.c_str());
   return SA_AIS_OK;
 }
 
@@ -503,20 +485,16 @@ Consensus::TakeoverState Consensus::HandleTakeoverRequest(
     return TakeoverState::UNDEFINED;
   }
 
-  // request is a space delimited string with 5 elements
-  osafassert(tokens.size() == 5);
+  // request is a space delimited string with 4 elements
+  osafassert(tokens.size() == 4);
 
   // check the owner is this node
-  if (tokens[1] != base::Conf::NodeName()) {
+  if (tokens[static_cast<std::uint8_t>(TakeoverElements::CURRENT_OWNER)] !=
+      base::Conf::NodeName()) {
     LOG_ER("We do not own the lock. Ignoring takeover request");
     return TakeoverState::UNDEFINED;
   }
 
-  // expiration timestamp
-  const uint64_t expiration = strtoull(
-      tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(), 
0,
-      10);
-
   // size of the other network partition
   const uint64_t proposed_cluster_size = strtoull(
       
tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_NETWORK_SIZE)]
@@ -527,14 +505,13 @@ Consensus::TakeoverState Consensus::HandleTakeoverRequest(
          proposed_cluster_size, cluster_size);
 
   TakeoverState result;
-  if (CurrentTime() <= expiration && proposed_cluster_size > cluster_size) {
+  if (proposed_cluster_size > cluster_size) {
     result = TakeoverState::ACCEPTED;
   } else {
     result = TakeoverState::REJECTED;
   }
 
   rc = WriteTakeoverResult(
-      tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)],
       tokens[static_cast<std::uint8_t>(TakeoverElements::CURRENT_OWNER)],
       tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_OWNER)],
       tokens[static_cast<std::uint8_t>(
@@ -558,15 +535,3 @@ void Consensus::Split(const std::string& str,
     tokens.push_back(buffer);
   }
 }
-
-// seconds after epoch
-uint64_t Consensus::CurrentTime() const {
-  auto now = std::chrono::system_clock::now();
-
-  // seconds since epoch
-  auto timestamp =
-      std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
-          .count();
-
-  return timestamp;
-}
diff --git a/src/osaf/consensus/consensus.h b/src/osaf/consensus/consensus.h
index abd17612c..f6493bedc 100644
--- a/src/osaf/consensus/consensus.h
+++ b/src/osaf/consensus/consensus.h
@@ -26,7 +26,7 @@ class Consensus {
  public:
   // Set active controller to this node
   SaAisErrorT PromoteThisNode(const bool graceful_takeover,
-    const uint64_t cluster_size);
+                              const uint64_t cluster_size);
 
   // Clear current active controller by releasing lock
   SaAisErrorT DemoteCurrentActive();
@@ -67,11 +67,10 @@ class Consensus {
   };
 
   enum class TakeoverElements : std::uint8_t {
-    TIMESTAMP = 0,
-    CURRENT_OWNER = 1,
-    PROPOSED_OWNER = 2,
-    PROPOSED_NETWORK_SIZE = 3,
-    STATE = 4
+    CURRENT_OWNER = 0,
+    PROPOSED_OWNER = 1,
+    PROPOSED_NETWORK_SIZE = 2,
+    STATE = 3
   };
 
   const std::string TakeoverStateStr[4] = {"UNDEFINED", "NEW", "ACCEPTED",
@@ -84,11 +83,11 @@ class Consensus {
   bool use_remote_fencing_ = false;
   const std::string kTestKeyname = "opensaf_write_test";
   const std::chrono::milliseconds kSleepInterval =
-    std::chrono::milliseconds(500);  // in ms
+      std::chrono::milliseconds(500);  // in ms
   static constexpr uint32_t kLockTimeout = 0;  // lock is persistent by default
   static constexpr uint32_t kMaxTakeoverRetry = 20;
   static constexpr uint32_t kMaxRetry = 60;
-  static constexpr uint32_t kTakeoverValidTime = 20;  // in seconds
+  static constexpr uint32_t kTakeoverValidTime = 15;  // in seconds
 
   void CheckForExistingTakeoverRequest();
 
@@ -98,17 +97,17 @@ class Consensus {
 
   SaAisErrorT ReadTakeoverRequest(std::vector<std::string>& tokens);
 
-  SaAisErrorT WriteTakeoverResult(const std::string& timestamp,
-                                  const std::string& current_owner,
+  SaAisErrorT WriteTakeoverResult(const std::string& current_owner,
                                   const std::string& proposed_owner,
                                   const std::string& proposed_cluster_size,
                                   const TakeoverState result);
 
+  SaAisErrorT RemoveTakeoverRequest();
+
   SaAisErrorT Demote(const std::string& node);
   bool FenceNode(const std::string& node);
 
   void Split(const std::string& str, std::vector<std::string>& tokens) const;
-  uint64_t CurrentTime() const;
 
   DELETE_COPY_AND_MOVE_OPERATORS(Consensus);
 };
diff --git a/src/osaf/consensus/key_value.cc b/src/osaf/consensus/key_value.cc
index 80950e7cb..cf5c02213 100644
--- a/src/osaf/consensus/key_value.cc
+++ b/src/osaf/consensus/key_value.cc
@@ -91,13 +91,14 @@ SaAisErrorT KeyValue::Set(const std::string& key, const 
std::string& value,
   }
 }
 
-SaAisErrorT KeyValue::Create(const std::string& key, const std::string& value) 
{
+SaAisErrorT KeyValue::Create(const std::string& key, const std::string& value,
+                             const unsigned int timeout) {
   TRACE_ENTER();
 
   const std::string kv_store_cmd =
       base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
   const std::string command(kv_store_cmd + " create \"" + key + "\" \"" +
-                            value + "\"");
+                            value + "\" " + std::to_string(timeout));
   std::string output;
   int rc = KeyValue::Execute(command, output);
 
diff --git a/src/osaf/consensus/key_value.h b/src/osaf/consensus/key_value.h
index b003629c0..af9717595 100644
--- a/src/osaf/consensus/key_value.h
+++ b/src/osaf/consensus/key_value.h
@@ -37,7 +37,8 @@ class KeyValue {
                          const std::string& prev_value);
 
   // Create key, and set to value. Fails if key already exists.
-  static SaAisErrorT Create(const std::string& key, const std::string& value);
+  static SaAisErrorT Create(const std::string& key, const std::string& value,
+                            const unsigned int timeout = 0);
 
   // Erase key
   static SaAisErrorT Erase(const std::string& key);
diff --git a/src/osaf/consensus/plugins/etcd.plugin 
b/src/osaf/consensus/plugins/etcd.plugin
index 6ed85ac92..b585bb657 100644
--- a/src/osaf/consensus/plugins/etcd.plugin
+++ b/src/osaf/consensus/plugins/etcd.plugin
@@ -67,6 +67,7 @@ setkey() {
 # params:
 #   $1 - <key>
 #   $2 - <value>
+#   $3 - <timeout>
 # returns:
 #   0 - success
 #   1 - already exists
@@ -74,9 +75,10 @@ setkey() {
 create_key() {
   readonly key="$1"
   readonly value="$2"
+  readonly timeout="$3"
 
   if output=$(etcdctl $etcd_options --timeout $etcd_timeout mk 
"$directory$key" \
-    "$value" 2>&1)
+    "$value" --ttl "$timeout" 2>&1)
   then
     return 0
   else
@@ -277,11 +279,11 @@ case "$1" in
     exit $?
     ;;
   create)
-    if [ "$#" -ne 3 ]; then
-      echo "Usage: $0 create <key> <value>"
-      exit 1
+    if [ "$#" -ne 4 ]; then
+      echo "Usage: $0 create <key> <value> <timeout>"
+      exit 125
     fi
-    create_key "$2" "$3"
+    create_key "$2" "$3" "$4"
     exit $?
     ;;
   erase)
diff --git a/src/osaf/consensus/plugins/etcd3.plugin 
b/src/osaf/consensus/plugins/etcd3.plugin
index 451440567..07fc9f2e1 100644
--- a/src/osaf/consensus/plugins/etcd3.plugin
+++ b/src/osaf/consensus/plugins/etcd3.plugin
@@ -33,10 +33,17 @@ export ETCDCTL_API=3
 get() {
   readonly key="$1"
 
-  if value=$(etcdctl $etcd_options --dial-timeout $etcd_timeout get 
"$directory$key" | tail -n1)
+  if output=$(etcdctl $etcd_options --dial-timeout $etcd_timeout get 
"$directory$key" 2>&1)
   then
-    echo "$value"
-    return 0
+    key_=$(echo "$output" | tail -n2 | head -n1)
+    value=$(echo "$output" | tail -n1)
+    if [ "$key_" = "$directory$key" ]; then
+      echo "$value"
+      return 0
+    else
+      # key missing!
+      return 1
+    fi
   else
     return 1
   fi
@@ -69,6 +76,7 @@ setkey() {
 # params:
 #   $1 - <key>
 #   $2 - <value>
+#   $3 - <timeout>
 # returns:
 #   0 - success
 #   1 - already exists
@@ -76,10 +84,25 @@ setkey() {
 create_key() {
   readonly key="$1"
   readonly value="$2"
+  readonly timeout="$3"
+
+  if [ $timeout -gt 0 ]; then
+    # create lease
+    if output=$(etcdctl $etcd_options --dial-timeout $etcd_timeout lease grant 
$timeout)
+    then
+      lease_id=$(echo $output | awk '{print $2}')
+      lease_param="--lease="$lease_id""
+    else
+      return 2
+    fi
+  else
+    lease_param=""
+  fi
+
   # first try to create the key
   transaction="create(\""$directory$key"\") = \"0\"
 
-    put \""$directory$key"\" \""$value"\"
+    put \""$directory$key"\" \""$value"\" "$lease_param"
 
   "
   output=$(etcdctl $etcd_options --dial-timeout $etcd_timeout txn <<< 
"$transaction")
@@ -299,11 +322,11 @@ case "$1" in
     exit $?
     ;;
   create)
-    if [ "$#" -ne 3 ]; then
-      echo "Usage: $0 create <key> <value>"
-      exit 1
+    if [ "$#" -ne 4 ]; then
+      echo "Usage: $0 create <key> <value> <timeout>"
+      exit 125
     fi
-    create_key "$2" "$3"
+    create_key "$2" "$3" "$4"
     exit $?
     ;;
   erase)
diff --git a/src/osaf/consensus/plugins/sample.plugin 
b/src/osaf/consensus/plugins/sample.plugin
index 445cf8d84..6f6c71f6f 100644
--- a/src/osaf/consensus/plugins/sample.plugin
+++ b/src/osaf/consensus/plugins/sample.plugin
@@ -50,6 +50,7 @@ setkey() {
 # params:
 #   $1 - <key>
 #   $2 - <value>
+#   $3 - <timeout>
 # returns:
 #   0 - success
 #   1 - already exists
@@ -57,6 +58,7 @@ setkey() {
 create_key() {
   readonly key="$1"
   readonly value="$2"
+  readonly timeout="$3"
   ...
 }
 
@@ -169,11 +171,11 @@ case "$1" in
     exit $?
     ;;
   create)
-    if [ "$#" -ne 3 ]; then
-      echo "Usage: $0 create <key> <value>"
-      exit 1
+    if [ "$#" -ne 4 ]; then
+      echo "Usage: $0 create <key> <value> <timeout>"
+      exit 125
     fi
-    create_key "$2" "$3"
+    create_key "$2" "$3" "$4"
     exit $?
     ;;
   erase)
-- 
2.14.1


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to