* update create() in the plugins to include a timeout parameter * remove timestamp from the takeover request and utilise the built-in timeout functionality in the KV store --- src/osaf/consensus/consensus.cc | 165 ++++++++++++------------------- src/osaf/consensus/consensus.h | 21 ++-- src/osaf/consensus/key_value.cc | 5 +- src/osaf/consensus/key_value.h | 3 +- src/osaf/consensus/plugins/etcd.plugin | 12 ++- src/osaf/consensus/plugins/etcd3.plugin | 39 ++++++-- src/osaf/consensus/plugins/sample.plugin | 10 +- 7 files changed, 124 insertions(+), 131 deletions(-)
diff --git a/src/osaf/consensus/consensus.cc b/src/osaf/consensus/consensus.cc index f2245dd01..a6248136d 100644 --- a/src/osaf/consensus/consensus.cc +++ b/src/osaf/consensus/consensus.cc @@ -25,7 +25,7 @@ const std::string Consensus::kTakeoverRequestKeyname = "takeover_request"; SaAisErrorT Consensus::PromoteThisNode(const bool graceful_takeover, - const uint64_t cluster_size) { + const uint64_t cluster_size) { TRACE_ENTER(); SaAisErrorT rc; @@ -89,18 +89,8 @@ SaAisErrorT Consensus::PromoteThisNode(const bool graceful_takeover, } if (take_over_request_created == true) { - SaAisErrorT rc1; - - // remove takeover request - rc1 = KeyValue::Erase(kTakeoverRequestKeyname); - retries = 0; - while (rc1 != SA_AIS_OK && retries < kMaxRetry) { - ++retries; - std::this_thread::sleep_for(kSleepInterval); - rc1 = KeyValue::Erase(kTakeoverRequestKeyname); - } - - if (rc1 != SA_AIS_OK) { + rc = RemoveTakeoverRequest(); + if (rc != SA_AIS_OK) { LOG_WA("Could not remove takeover request"); } } @@ -115,6 +105,23 @@ SaAisErrorT Consensus::PromoteThisNode(const bool graceful_takeover, return rc; } +SaAisErrorT Consensus::RemoveTakeoverRequest() { + TRACE_ENTER(); + SaAisErrorT rc; + uint32_t retries = 0; + + // remove takeover request + rc = KeyValue::Erase(kTakeoverRequestKeyname); + retries = 0; + while (rc != SA_AIS_OK && retries < kMaxRetry) { + ++retries; + std::this_thread::sleep_for(kSleepInterval); + rc = KeyValue::Erase(kTakeoverRequestKeyname); + } + + return rc; +} + SaAisErrorT Consensus::Demote(const std::string& node) { TRACE_ENTER(); if (use_consensus_ == false) { @@ -273,6 +280,8 @@ void Consensus::MonitorTakeoverRequest(ConsensusCallback callback, } void Consensus::CheckForExistingTakeoverRequest() { + TRACE_ENTER(); + SaAisErrorT rc; std::vector<std::string> tokens; rc = ReadTakeoverRequest(tokens); @@ -281,22 +290,17 @@ void Consensus::CheckForExistingTakeoverRequest() { return; } - // get expiration - const uint64_t expiration_timestamp = strtoull( - tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(), - 0, 10); + LOG_NO("A takeover request is in progress"); - // wait until expiration is over - int64_t expiration = expiration_timestamp - CurrentTime(); - if (expiration > 0 && expiration <= kTakeoverValidTime) { - // @todo check if the takeover request has been deleted already? - LOG_NO("A takeover request is in progress" - " (expiring in %" PRId64 " seconds)", - expiration); - std::chrono::seconds sleep_duration(expiration); - std::this_thread::sleep_for(sleep_duration); - } else { - LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp); + uint32_t retries = 0; + // wait up to approximately 10 seconds, or until the takeover request is gone + rc = ReadTakeoverRequest(tokens); + while (rc == SA_AIS_OK && + retries < kMaxTakeoverRetry) { + ++retries; + TRACE("Takeover request still present"); + std::this_thread::sleep_for(kSleepInterval); + rc = ReadTakeoverRequest(tokens); } } @@ -306,24 +310,23 @@ SaAisErrorT Consensus::CreateTakeoverRequest(const std::string& current_owner, TRACE_ENTER(); // Format of takeover request: - // "expiration_time<space>current_owner<space>proposed_owner<space> + // "current_owner<space>proposed_owner<space> // proposed_owner_cluster_size<space>status" // status := [UNDEFINED, NEW, REJECTED, ACCEPTED] std::string takeover_request; - // request to expire in 20 seconds - uint64_t timestamp = CurrentTime() + kTakeoverValidTime; takeover_request = - std::to_string(timestamp) + " " + current_owner + " " + - base::Conf::NodeName() + " " + std::to_string(cluster_size) + " " + + current_owner + " " + base::Conf::NodeName() + " " + + std::to_string(cluster_size) + " " + TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)]; TRACE("Takeover request: \"%s\"", takeover_request.c_str()); SaAisErrorT rc; uint32_t retries = 0; - rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request); + rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request, + kTakeoverValidTime); while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) { ++retries; std::this_thread::sleep_for(kSleepInterval); @@ -336,49 +339,29 @@ SaAisErrorT Consensus::CreateTakeoverRequest(const std::string& current_owner, // retrieve takeover request std::vector<std::string> tokens; retries = 0; + // wait up to approximately 10 seconds, or until the takeover request is + // gone rc = ReadTakeoverRequest(tokens); - while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) { + while (rc == SA_AIS_OK && + retries < kMaxTakeoverRetry) { ++retries; + TRACE("Takeover request still present"); std::this_thread::sleep_for(kSleepInterval); rc = ReadTakeoverRequest(tokens); } if (rc == SA_AIS_OK) { - // get expiration - const uint64_t expiration_timestamp = strtoull( - tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(), - 0, 10); - - // wait until expiration is over - int64_t expiration = expiration_timestamp - CurrentTime(); - if (expiration > 0 && expiration <= kTakeoverValidTime) { - LOG_NO("A takeover request is in progress" - " (expiring in %" PRId64 " seconds)", - expiration); - std::chrono::seconds sleep_duration(expiration); - std::this_thread::sleep_for(sleep_duration); - } else { - LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp); - } - } // else remove it anyway - - LOG_NO("Remove expired takeover request"); - - // remove expired request - retries = 0; - rc = KeyValue::Erase(kTakeoverRequestKeyname); - while (rc != SA_AIS_OK && retries < kMaxRetry) { - ++retries; - std::this_thread::sleep_for(kSleepInterval); + // still there? We need to forcibly remove it + retries = 0; rc = KeyValue::Erase(kTakeoverRequestKeyname); + while (rc != SA_AIS_OK && retries < kMaxRetry) { + rc = KeyValue::Erase(kTakeoverRequestKeyname); + } } - if (rc == SA_AIS_OK) { - return CreateTakeoverRequest(current_owner, proposed_owner, cluster_size); - } else { - LOG_ER("Could not remove existing takeover request"); - return SA_AIS_ERR_EXIST; - } + LOG_NO("Takeover request expired or removed"); + + return CreateTakeoverRequest(current_owner, proposed_owner, cluster_size); } // wait up to 10s for request to be answered @@ -421,24 +404,21 @@ SaAisErrorT Consensus::CreateTakeoverRequest(const std::string& current_owner, std::this_thread::sleep_for(kSleepInterval); } - LOG_NO("Result: %d", rc); + TRACE("Result: %d", rc); return rc; } SaAisErrorT Consensus::WriteTakeoverResult( - const std::string& timestamp, const std::string& current_owner, - const std::string& proposed_owner, const std::string& proposed_cluster_size, - const TakeoverState result) { + const std::string& current_owner, const std::string& proposed_owner, + const std::string& proposed_cluster_size, const TakeoverState result) { TRACE_ENTER(); const std::string takeover_request = - timestamp + " " + current_owner + " " + proposed_owner + " " + - proposed_cluster_size + " " + + current_owner + " " + proposed_owner + " " + proposed_cluster_size + " " + TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)]; const std::string takeover_result = - timestamp + " " + current_owner + " " + proposed_owner + " " + - proposed_cluster_size + " " + + current_owner + " " + proposed_owner + " " + proposed_cluster_size + " " + TakeoverStateStr[static_cast<std::uint8_t>(result)]; LOG_NO("TakeoverResult: %s", takeover_result.c_str()); @@ -461,7 +441,7 @@ SaAisErrorT Consensus::ReadTakeoverRequest(std::vector<std::string>& tokens) { rc = KeyValue::Get(kTakeoverRequestKeyname, request); if (rc != SA_AIS_OK) { // it doesn't always exist, don't log an error - LOG_NO("Could not read takeover request (%d)", rc); + TRACE("Could not read takeover request (%d)", rc); return SA_AIS_ERR_FAILED_OPERATION; } @@ -470,12 +450,14 @@ SaAisErrorT Consensus::ReadTakeoverRequest(std::vector<std::string>& tokens) { return SA_AIS_ERR_UNAVAILABLE; } + tokens.clear(); Split(request, tokens); - if (tokens.size() != 5) { - LOG_ER("Invalid takeover request"); + if (tokens.size() != 4) { + LOG_ER("Invalid takeover request: '%s'", request.c_str()); return SA_AIS_ERR_LIBRARY; } + TRACE("Found '%s'", request.c_str()); return SA_AIS_OK; } @@ -503,20 +485,16 @@ Consensus::TakeoverState Consensus::HandleTakeoverRequest( return TakeoverState::UNDEFINED; } - // request is a space delimited string with 5 elements - osafassert(tokens.size() == 5); + // request is a space delimited string with 4 elements + osafassert(tokens.size() == 4); // check the owner is this node - if (tokens[1] != base::Conf::NodeName()) { + if (tokens[static_cast<std::uint8_t>(TakeoverElements::CURRENT_OWNER)] != + base::Conf::NodeName()) { LOG_ER("We do not own the lock. Ignoring takeover request"); return TakeoverState::UNDEFINED; } - // expiration timestamp - const uint64_t expiration = strtoull( - tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(), 0, - 10); - // size of the other network partition const uint64_t proposed_cluster_size = strtoull( tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_NETWORK_SIZE)] @@ -527,14 +505,13 @@ Consensus::TakeoverState Consensus::HandleTakeoverRequest( proposed_cluster_size, cluster_size); TakeoverState result; - if (CurrentTime() <= expiration && proposed_cluster_size > cluster_size) { + if (proposed_cluster_size > cluster_size) { result = TakeoverState::ACCEPTED; } else { result = TakeoverState::REJECTED; } rc = WriteTakeoverResult( - tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)], tokens[static_cast<std::uint8_t>(TakeoverElements::CURRENT_OWNER)], tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_OWNER)], tokens[static_cast<std::uint8_t>( @@ -558,15 +535,3 @@ void Consensus::Split(const std::string& str, tokens.push_back(buffer); } } - -// seconds after epoch -uint64_t Consensus::CurrentTime() const { - auto now = std::chrono::system_clock::now(); - - // seconds since epoch - auto timestamp = - std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()) - .count(); - - return timestamp; -} diff --git a/src/osaf/consensus/consensus.h b/src/osaf/consensus/consensus.h index abd17612c..f6493bedc 100644 --- a/src/osaf/consensus/consensus.h +++ b/src/osaf/consensus/consensus.h @@ -26,7 +26,7 @@ class Consensus { public: // Set active controller to this node SaAisErrorT PromoteThisNode(const bool graceful_takeover, - const uint64_t cluster_size); + const uint64_t cluster_size); // Clear current active controller by releasing lock SaAisErrorT DemoteCurrentActive(); @@ -67,11 +67,10 @@ class Consensus { }; enum class TakeoverElements : std::uint8_t { - TIMESTAMP = 0, - CURRENT_OWNER = 1, - PROPOSED_OWNER = 2, - PROPOSED_NETWORK_SIZE = 3, - STATE = 4 + CURRENT_OWNER = 0, + PROPOSED_OWNER = 1, + PROPOSED_NETWORK_SIZE = 2, + STATE = 3 }; const std::string TakeoverStateStr[4] = {"UNDEFINED", "NEW", "ACCEPTED", @@ -84,11 +83,11 @@ class Consensus { bool use_remote_fencing_ = false; const std::string kTestKeyname = "opensaf_write_test"; const std::chrono::milliseconds kSleepInterval = - std::chrono::milliseconds(500); // in ms + std::chrono::milliseconds(500); // in ms static constexpr uint32_t kLockTimeout = 0; // lock is persistent by default static constexpr uint32_t kMaxTakeoverRetry = 20; static constexpr uint32_t kMaxRetry = 60; - static constexpr uint32_t kTakeoverValidTime = 20; // in seconds + static constexpr uint32_t kTakeoverValidTime = 15; // in seconds void CheckForExistingTakeoverRequest(); @@ -98,17 +97,17 @@ class Consensus { SaAisErrorT ReadTakeoverRequest(std::vector<std::string>& tokens); - SaAisErrorT WriteTakeoverResult(const std::string& timestamp, - const std::string& current_owner, + SaAisErrorT WriteTakeoverResult(const std::string& current_owner, const std::string& proposed_owner, const std::string& proposed_cluster_size, const TakeoverState result); + SaAisErrorT RemoveTakeoverRequest(); + SaAisErrorT Demote(const std::string& node); bool FenceNode(const std::string& node); void Split(const std::string& str, std::vector<std::string>& tokens) const; - uint64_t CurrentTime() const; DELETE_COPY_AND_MOVE_OPERATORS(Consensus); }; diff --git a/src/osaf/consensus/key_value.cc b/src/osaf/consensus/key_value.cc index 80950e7cb..cf5c02213 100644 --- a/src/osaf/consensus/key_value.cc +++ b/src/osaf/consensus/key_value.cc @@ -91,13 +91,14 @@ SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value, } } -SaAisErrorT KeyValue::Create(const std::string& key, const std::string& value) { +SaAisErrorT KeyValue::Create(const std::string& key, const std::string& value, + const unsigned int timeout) { TRACE_ENTER(); const std::string kv_store_cmd = base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); const std::string command(kv_store_cmd + " create \"" + key + "\" \"" + - value + "\""); + value + "\" " + std::to_string(timeout)); std::string output; int rc = KeyValue::Execute(command, output); diff --git a/src/osaf/consensus/key_value.h b/src/osaf/consensus/key_value.h index b003629c0..af9717595 100644 --- a/src/osaf/consensus/key_value.h +++ b/src/osaf/consensus/key_value.h @@ -37,7 +37,8 @@ class KeyValue { const std::string& prev_value); // Create key, and set to value. Fails if key already exists. - static SaAisErrorT Create(const std::string& key, const std::string& value); + static SaAisErrorT Create(const std::string& key, const std::string& value, + const unsigned int timeout = 0); // Erase key static SaAisErrorT Erase(const std::string& key); diff --git a/src/osaf/consensus/plugins/etcd.plugin b/src/osaf/consensus/plugins/etcd.plugin index 6ed85ac92..b585bb657 100644 --- a/src/osaf/consensus/plugins/etcd.plugin +++ b/src/osaf/consensus/plugins/etcd.plugin @@ -67,6 +67,7 @@ setkey() { # params: # $1 - <key> # $2 - <value> +# $3 - <timeout> # returns: # 0 - success # 1 - already exists @@ -74,9 +75,10 @@ setkey() { create_key() { readonly key="$1" readonly value="$2" + readonly timeout="$3" if output=$(etcdctl $etcd_options --timeout $etcd_timeout mk "$directory$key" \ - "$value" 2>&1) + "$value" --ttl "$timeout" 2>&1) then return 0 else @@ -277,11 +279,11 @@ case "$1" in exit $? ;; create) - if [ "$#" -ne 3 ]; then - echo "Usage: $0 create <key> <value>" - exit 1 + if [ "$#" -ne 4 ]; then + echo "Usage: $0 create <key> <value> <timeout>" + exit 125 fi - create_key "$2" "$3" + create_key "$2" "$3" "$4" exit $? ;; erase) diff --git a/src/osaf/consensus/plugins/etcd3.plugin b/src/osaf/consensus/plugins/etcd3.plugin index 451440567..07fc9f2e1 100644 --- a/src/osaf/consensus/plugins/etcd3.plugin +++ b/src/osaf/consensus/plugins/etcd3.plugin @@ -33,10 +33,17 @@ export ETCDCTL_API=3 get() { readonly key="$1" - if value=$(etcdctl $etcd_options --dial-timeout $etcd_timeout get "$directory$key" | tail -n1) + if output=$(etcdctl $etcd_options --dial-timeout $etcd_timeout get "$directory$key" 2>&1) then - echo "$value" - return 0 + key_=$(echo "$output" | tail -n2 | head -n1) + value=$(echo "$output" | tail -n1) + if [ "$key_" = "$directory$key" ]; then + echo "$value" + return 0 + else + # key missing! + return 1 + fi else return 1 fi @@ -69,6 +76,7 @@ setkey() { # params: # $1 - <key> # $2 - <value> +# $3 - <timeout> # returns: # 0 - success # 1 - already exists @@ -76,10 +84,25 @@ setkey() { create_key() { readonly key="$1" readonly value="$2" + readonly timeout="$3" + + if [ $timeout -gt 0 ]; then + # create lease + if output=$(etcdctl $etcd_options --dial-timeout $etcd_timeout lease grant $timeout) + then + lease_id=$(echo $output | awk '{print $2}') + lease_param="--lease="$lease_id"" + else + return 2 + fi + else + lease_param="" + fi + # first try to create the key transaction="create(\""$directory$key"\") = \"0\" - put \""$directory$key"\" \""$value"\" + put \""$directory$key"\" \""$value"\" "$lease_param" " output=$(etcdctl $etcd_options --dial-timeout $etcd_timeout txn <<< "$transaction") @@ -299,11 +322,11 @@ case "$1" in exit $? ;; create) - if [ "$#" -ne 3 ]; then - echo "Usage: $0 create <key> <value>" - exit 1 + if [ "$#" -ne 4 ]; then + echo "Usage: $0 create <key> <value> <timeout>" + exit 125 fi - create_key "$2" "$3" + create_key "$2" "$3" "$4" exit $? ;; erase) diff --git a/src/osaf/consensus/plugins/sample.plugin b/src/osaf/consensus/plugins/sample.plugin index 445cf8d84..6f6c71f6f 100644 --- a/src/osaf/consensus/plugins/sample.plugin +++ b/src/osaf/consensus/plugins/sample.plugin @@ -50,6 +50,7 @@ setkey() { # params: # $1 - <key> # $2 - <value> +# $3 - <timeout> # returns: # 0 - success # 1 - already exists @@ -57,6 +58,7 @@ setkey() { create_key() { readonly key="$1" readonly value="$2" + readonly timeout="$3" ... } @@ -169,11 +171,11 @@ case "$1" in exit $? ;; create) - if [ "$#" -ne 3 ]; then - echo "Usage: $0 create <key> <value>" - exit 1 + if [ "$#" -ne 4 ]; then + echo "Usage: $0 create <key> <value> <timeout>" + exit 125 fi - create_key "$2" "$3" + create_key "$2" "$3" "$4" exit $? ;; erase) -- 2.14.1 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel