- add create and set (if previous value matches) functions to KeyValue class - add Consensus::MonitorTakeoverRequest() function for use by RDE to answer takeover requests - add Consensus::CreateTakeoverRequest() - before a SC is promoted to active, it will create a takeover request in the KV store. An existing SC can reject the lock takeover --- src/osaf/consensus/consensus.cc | 435 ++++++++++++++++++++++++++++++++++------ src/osaf/consensus/consensus.h | 55 ++++- src/osaf/consensus/key_value.cc | 105 +++++++--- src/osaf/consensus/key_value.h | 19 +- 4 files changed, 511 insertions(+), 103 deletions(-)
diff --git a/src/osaf/consensus/consensus.cc b/src/osaf/consensus/consensus.cc index cc04f3518..f2245dd01 100644 --- a/src/osaf/consensus/consensus.cc +++ b/src/osaf/consensus/consensus.cc @@ -15,13 +15,17 @@ #include "osaf/consensus/consensus.h" #include <unistd.h> #include <climits> +#include <sstream> #include <thread> #include "base/conf.h" #include "base/getenv.h" #include "base/logtrace.h" #include "base/ncssysf_def.h" -SaAisErrorT Consensus::PromoteThisNode() { +const std::string Consensus::kTakeoverRequestKeyname = "takeover_request"; + +SaAisErrorT Consensus::PromoteThisNode(const bool graceful_takeover, + const uint64_t cluster_size) { TRACE_ENTER(); SaAisErrorT rc; @@ -29,6 +33,10 @@ SaAisErrorT Consensus::PromoteThisNode() { return SA_AIS_OK; } + // check if there is an existing takeover requests, we cannot + // attempt to lock until that is complete + CheckForExistingTakeoverRequest(); + uint32_t retries = 0; rc = KeyValue::Lock(base::Conf::NodeName(), kLockTimeout); while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) { @@ -39,33 +47,30 @@ SaAisErrorT Consensus::PromoteThisNode() { } if (rc == SA_AIS_ERR_EXIST) { + // there's a chance the lock has been released since the lock attempt // get the current active controller - std::string current_active(""); - retries = 0; - rc = KeyValue::LockOwner(current_active); - while (rc != SA_AIS_OK && retries < kMaxRetry) { - ++retries; - std::this_thread::sleep_for(kSleepInterval); - rc = KeyValue::LockOwner(current_active); - } - if (rc != SA_AIS_OK) { - LOG_ER("Failed to get current lock owner. Will attempt to lock anyway"); + std::string current_active = CurrentActive(); + + if (current_active.empty() == true) { + LOG_WA("Failed to get current lock owner. Will attempt to lock anyway"); } + bool take_over_request_created = false; LOG_NO("Current active controller is %s", current_active.c_str()); - // there's a chance the lock has been released since the lock attempt if (current_active.empty() == false) { - // remove current active controller's lock and fence it - retries = 0; - rc = KeyValue::Unlock(current_active); - while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) { - LOG_IN("Trying to unlock"); - ++retries; - std::this_thread::sleep_for(kSleepInterval); - rc = KeyValue::Unlock(current_active); + if (graceful_takeover == true) { + rc = CreateTakeoverRequest(current_active, base::Conf::NodeName(), + cluster_size); + if (rc != SA_AIS_OK) { + LOG_WA("Takeover request failed (%d)", rc); + return rc; + } + take_over_request_created = true; } + // remove current active controller's lock and fence it + rc = Demote(current_active); if (rc == SA_AIS_OK) { FenceNode(current_active); } else { @@ -82,6 +87,23 @@ SaAisErrorT Consensus::PromoteThisNode() { std::this_thread::sleep_for(kSleepInterval); rc = KeyValue::Lock(base::Conf::NodeName(), kLockTimeout); } + + if (take_over_request_created == true) { + SaAisErrorT rc1; + + // remove takeover request + rc1 = KeyValue::Erase(kTakeoverRequestKeyname); + retries = 0; + while (rc1 != SA_AIS_OK && retries < kMaxRetry) { + ++retries; + std::this_thread::sleep_for(kSleepInterval); + rc1 = KeyValue::Erase(kTakeoverRequestKeyname); + } + + if (rc1 != SA_AIS_OK) { + LOG_WA("Could not remove takeover request"); + } + } } if (rc == SA_AIS_OK) { @@ -93,43 +115,23 @@ SaAisErrorT Consensus::PromoteThisNode() { return rc; } -SaAisErrorT Consensus::Demote(const std::string& node = "") { +SaAisErrorT Consensus::Demote(const std::string& node) { TRACE_ENTER(); if (use_consensus_ == false) { return SA_AIS_OK; } - SaAisErrorT rc = SA_AIS_ERR_FAILED_OPERATION; - uint32_t retries = 0; - - // check current active node - std::string current_active; - rc = KeyValue::LockOwner(current_active); - while (rc != SA_AIS_OK && retries < kMaxRetry) { - ++retries; - std::this_thread::sleep_for(kSleepInterval); - rc = KeyValue::LockOwner(current_active); - } - - if (rc != SA_AIS_OK) { - LOG_ER("Failed to get lock owner"); - return rc; - } - - LOG_NO("Demoting %s as active controller", current_active.c_str()); + osafassert(node.empty() == false); - if (node.empty() == false && node != current_active) { - // node is not the current active controller! - osafassert(false); - } + SaAisErrorT rc; + uint32_t retries = 0; - retries = 0; - rc = KeyValue::Unlock(current_active); + rc = KeyValue::Unlock(node); while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) { LOG_IN("Trying to unlock"); ++retries; std::this_thread::sleep_for(kSleepInterval); - rc = KeyValue::Unlock(current_active); + rc = KeyValue::Unlock(node); } if (rc != SA_AIS_OK) { @@ -143,7 +145,17 @@ SaAisErrorT Consensus::Demote(const std::string& node = "") { SaAisErrorT Consensus::DemoteCurrentActive() { TRACE_ENTER(); - return Demote(); + + // check current active node + std::string current_active = CurrentActive(); + if (current_active.empty() == true) { + LOG_ER("Failed to get lock owner"); + return SA_AIS_ERR_FAILED_OPERATION; + } + + LOG_NO("Demoting %s as active controller", current_active.c_str()); + + return Demote(current_active); } SaAisErrorT Consensus::DemoteThisNode() { @@ -151,9 +163,7 @@ SaAisErrorT Consensus::DemoteThisNode() { return Demote(base::Conf::NodeName()); } -bool Consensus::IsEnabled() const { - return use_consensus_; -} +bool Consensus::IsEnabled() const { return use_consensus_; } bool Consensus::IsWritable() const { TRACE_ENTER(); @@ -178,9 +188,7 @@ bool Consensus::IsWritable() const { } } -bool Consensus::IsRemoteFencingEnabled() const { - return use_remote_fencing_; -} +bool Consensus::IsRemoteFencingEnabled() const { return use_remote_fencing_; } std::string Consensus::CurrentActive() const { TRACE_ENTER(); @@ -188,7 +196,7 @@ std::string Consensus::CurrentActive() const { return ""; } - SaAisErrorT rc = SA_AIS_ERR_FAILED_OPERATION; + SaAisErrorT rc; uint32_t retries = 0; std::string owner; @@ -212,7 +220,7 @@ Consensus::Consensus() { uint32_t split_brain_enable = base::GetEnv("FMS_SPLIT_BRAIN_PREVENTION", 0); std::string kv_store_cmd = base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); - uint32_t use_remote_fencing = base::GetEnv("FMS_USE_REMOTE_FENCING" , 0); + uint32_t use_remote_fencing = base::GetEnv("FMS_USE_REMOTE_FENCING", 0); if (split_brain_enable == 1 && kv_store_cmd.empty() == false) { use_consensus_ = true; @@ -228,16 +236,14 @@ Consensus::Consensus() { base::Conf::InitNodeName(); } -Consensus::~Consensus() { -} +Consensus::~Consensus() {} bool Consensus::FenceNode(const std::string& node) { if (use_remote_fencing_ == true) { LOG_WA("Fencing remote node %s", node.c_str()); // @todo currently passing UINT_MAX as node ID, since // we can't always obtain a valid node ID? - opensaf_reboot(UINT_MAX, node.c_str(), - "Fencing remote node"); + opensaf_reboot(UINT_MAX, node.c_str(), "Fencing remote node"); return true; } else { @@ -247,7 +253,7 @@ bool Consensus::FenceNode(const std::string& node) { } void Consensus::MonitorLock(ConsensusCallback callback, - const uint32_t user_defined) { + const uint32_t user_defined) { TRACE_ENTER(); if (use_consensus_ == false) { return; @@ -255,3 +261,312 @@ void Consensus::MonitorLock(ConsensusCallback callback, KeyValue::WatchLock(callback, user_defined); } + +void Consensus::MonitorTakeoverRequest(ConsensusCallback callback, + const uint32_t user_defined) { + TRACE_ENTER(); + if (use_consensus_ == false) { + return; + } + + KeyValue::Watch(kTakeoverRequestKeyname, callback, user_defined); +} + +void Consensus::CheckForExistingTakeoverRequest() { + SaAisErrorT rc; + std::vector<std::string> tokens; + rc = ReadTakeoverRequest(tokens); + + if (rc != SA_AIS_OK) { + return; + } + + // get expiration + const uint64_t expiration_timestamp = strtoull( + tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(), + 0, 10); + + // wait until expiration is over + int64_t expiration = expiration_timestamp - CurrentTime(); + if (expiration > 0 && expiration <= kTakeoverValidTime) { + // @todo check if the takeover request has been deleted already? + LOG_NO("A takeover request is in progress" + " (expiring in %" PRId64 " seconds)", + expiration); + std::chrono::seconds sleep_duration(expiration); + std::this_thread::sleep_for(sleep_duration); + } else { + LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp); + } +} + +SaAisErrorT Consensus::CreateTakeoverRequest(const std::string& current_owner, + const std::string& proposed_owner, + const uint64_t cluster_size) { + TRACE_ENTER(); + + // Format of takeover request: + // "expiration_time<space>current_owner<space>proposed_owner<space> + // proposed_owner_cluster_size<space>status" + // status := [UNDEFINED, NEW, REJECTED, ACCEPTED] + + std::string takeover_request; + // request to expire in 20 seconds + uint64_t timestamp = CurrentTime() + kTakeoverValidTime; + + takeover_request = + std::to_string(timestamp) + " " + current_owner + " " + + base::Conf::NodeName() + " " + std::to_string(cluster_size) + " " + + TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)]; + + TRACE("Takeover request: \"%s\"", takeover_request.c_str()); + + SaAisErrorT rc; + uint32_t retries = 0; + rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request); + while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) { + ++retries; + std::this_thread::sleep_for(kSleepInterval); + rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request); + } + + if (rc == SA_AIS_ERR_EXIST) { + LOG_NO("Existing takeover request found"); + + // retrieve takeover request + std::vector<std::string> tokens; + retries = 0; + rc = ReadTakeoverRequest(tokens); + while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) { + ++retries; + std::this_thread::sleep_for(kSleepInterval); + rc = ReadTakeoverRequest(tokens); + } + + if (rc == SA_AIS_OK) { + // get expiration + const uint64_t expiration_timestamp = strtoull( + tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(), + 0, 10); + + // wait until expiration is over + int64_t expiration = expiration_timestamp - CurrentTime(); + if (expiration > 0 && expiration <= kTakeoverValidTime) { + LOG_NO("A takeover request is in progress" + " (expiring in %" PRId64 " seconds)", + expiration); + std::chrono::seconds sleep_duration(expiration); + std::this_thread::sleep_for(sleep_duration); + } else { + LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp); + } + } // else remove it anyway + + LOG_NO("Remove expired takeover request"); + + // remove expired request + retries = 0; + rc = KeyValue::Erase(kTakeoverRequestKeyname); + while (rc != SA_AIS_OK && retries < kMaxRetry) { + ++retries; + std::this_thread::sleep_for(kSleepInterval); + rc = KeyValue::Erase(kTakeoverRequestKeyname); + } + + if (rc == SA_AIS_OK) { + return CreateTakeoverRequest(current_owner, proposed_owner, cluster_size); + } else { + LOG_ER("Could not remove existing takeover request"); + return SA_AIS_ERR_EXIST; + } + } + + // wait up to 10s for request to be answered + retries = 0; + while (retries < kMaxTakeoverRetry) { + std::vector<std::string> tokens; + if (ReadTakeoverRequest(tokens) == SA_AIS_OK) { + const std::string state = + tokens[static_cast<std::uint8_t>(TakeoverElements::STATE)]; + const std::string _proposed_owner = + tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_OWNER)]; + + if (proposed_owner != _proposed_owner) { + LOG_ER("Takeover request was not created by us! (%s)", + _proposed_owner.c_str()); + rc = SA_AIS_ERR_EXIST; + break; + } + + if (state == TakeoverStateStr[static_cast<std::uint8_t>( + TakeoverState::REJECTED)]) { + LOG_NO("Takeover request rejected"); + rc = SA_AIS_ERR_EXIST; + break; + } else if (state == TakeoverStateStr[static_cast<std::uint8_t>( + TakeoverState::ACCEPTED)]) { + LOG_NO("Takeover request accepted"); + rc = SA_AIS_OK; + break; + } else if (state == TakeoverStateStr[static_cast<std::uint8_t>( + TakeoverState::NEW)]) { + TRACE("Waiting for response to takeover request"); + // set result to OK, in case we do not get a reply + // within the allocated period. This will allow the lock to + // removed by this node. Note: do not break out of the loop here! + rc = SA_AIS_OK; + } + } + ++retries; + std::this_thread::sleep_for(kSleepInterval); + } + + LOG_NO("Result: %d", rc); + return rc; +} + +SaAisErrorT Consensus::WriteTakeoverResult( + const std::string& timestamp, const std::string& current_owner, + const std::string& proposed_owner, const std::string& proposed_cluster_size, + const TakeoverState result) { + TRACE_ENTER(); + + const std::string takeover_request = + timestamp + " " + current_owner + " " + proposed_owner + " " + + proposed_cluster_size + " " + + TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)]; + + const std::string takeover_result = + timestamp + " " + current_owner + " " + proposed_owner + " " + + proposed_cluster_size + " " + + TakeoverStateStr[static_cast<std::uint8_t>(result)]; + + LOG_NO("TakeoverResult: %s", takeover_result.c_str()); + + SaAisErrorT rc; + + // previous value must match + rc = + KeyValue::Set(kTakeoverRequestKeyname, takeover_result, takeover_request); + + return rc; +} + +SaAisErrorT Consensus::ReadTakeoverRequest(std::vector<std::string>& tokens) { + TRACE_ENTER(); + + std::string request; + SaAisErrorT rc; + + rc = KeyValue::Get(kTakeoverRequestKeyname, request); + if (rc != SA_AIS_OK) { + // it doesn't always exist, don't log an error + LOG_NO("Could not read takeover request (%d)", rc); + return SA_AIS_ERR_FAILED_OPERATION; + } + + if (request.empty() == true) { + // on node shutdown, this could be empty + return SA_AIS_ERR_UNAVAILABLE; + } + + Split(request, tokens); + if (tokens.size() != 5) { + LOG_ER("Invalid takeover request"); + return SA_AIS_ERR_LIBRARY; + } + + return SA_AIS_OK; +} + +Consensus::TakeoverState Consensus::HandleTakeoverRequest( + const uint64_t cluster_size) { + TRACE_ENTER(); + + if (use_consensus_ == false) { + return TakeoverState::UNDEFINED; + } + + SaAisErrorT rc; + uint32_t retries = 0; + std::vector<std::string> tokens; + + // get request from KV store + rc = ReadTakeoverRequest(tokens); + while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) { + ++retries; + std::this_thread::sleep_for(kSleepInterval); + rc = ReadTakeoverRequest(tokens); + } + + if (rc != SA_AIS_OK) { + return TakeoverState::UNDEFINED; + } + + // request is a space delimited string with 5 elements + osafassert(tokens.size() == 5); + + // check the owner is this node + if (tokens[1] != base::Conf::NodeName()) { + LOG_ER("We do not own the lock. Ignoring takeover request"); + return TakeoverState::UNDEFINED; + } + + // expiration timestamp + const uint64_t expiration = strtoull( + tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(), 0, + 10); + + // size of the other network partition + const uint64_t proposed_cluster_size = strtoull( + tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_NETWORK_SIZE)] + .c_str(), + 0, 10); + + LOG_NO("Other network size: %" PRIu64 ", our network size: %" PRIu64, + proposed_cluster_size, cluster_size); + + TakeoverState result; + if (CurrentTime() <= expiration && proposed_cluster_size > cluster_size) { + result = TakeoverState::ACCEPTED; + } else { + result = TakeoverState::REJECTED; + } + + rc = WriteTakeoverResult( + tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)], + tokens[static_cast<std::uint8_t>(TakeoverElements::CURRENT_OWNER)], + tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_OWNER)], + tokens[static_cast<std::uint8_t>( + TakeoverElements::PROPOSED_NETWORK_SIZE)], + result); + if (rc != SA_AIS_OK) { + LOG_ER("Unable to write takeover result (%d)", rc); + return TakeoverState::UNDEFINED; + } + + return result; +} + +// separate space delimited elements in a string +void Consensus::Split(const std::string& str, + std::vector<std::string>& tokens) const { + std::stringstream stream(str); + std::string buffer; + + while (stream >> buffer) { + tokens.push_back(buffer); + } +} + +// seconds after epoch +uint64_t Consensus::CurrentTime() const { + auto now = std::chrono::system_clock::now(); + + // seconds since epoch + auto timestamp = + std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()) + .count(); + + return timestamp; +} diff --git a/src/osaf/consensus/consensus.h b/src/osaf/consensus/consensus.h index 9a8aa7e9b..abd17612c 100644 --- a/src/osaf/consensus/consensus.h +++ b/src/osaf/consensus/consensus.h @@ -17,14 +17,16 @@ #include <chrono> #include <string> -#include "saAis.h" +#include <vector> #include "base/macros.h" #include "osaf/consensus/key_value.h" +#include "saAis.h" class Consensus { public: // Set active controller to this node - SaAisErrorT PromoteThisNode(); + SaAisErrorT PromoteThisNode(const bool graceful_takeover, + const uint64_t cluster_size); // Clear current active controller by releasing lock SaAisErrorT DemoteCurrentActive(); @@ -40,6 +42,9 @@ class Consensus { // in the callback void MonitorLock(ConsensusCallback callback, const uint32_t user_defined); + void MonitorTakeoverRequest(ConsensusCallback callback, + const uint32_t user_defined); + // Is consensus service enabled? bool IsEnabled() const; @@ -52,17 +57,59 @@ class Consensus { Consensus(); virtual ~Consensus(); + static const std::string kTakeoverRequestKeyname; + + enum class TakeoverState : std::uint8_t { + UNDEFINED = 0, + NEW = 1, + ACCEPTED = 2, + REJECTED = 3, + }; + + enum class TakeoverElements : std::uint8_t { + TIMESTAMP = 0, + CURRENT_OWNER = 1, + PROPOSED_OWNER = 2, + PROPOSED_NETWORK_SIZE = 3, + STATE = 4 + }; + + const std::string TakeoverStateStr[4] = {"UNDEFINED", "NEW", "ACCEPTED", + "REJECTED"}; + + TakeoverState HandleTakeoverRequest(const uint64_t cluster_size); + private: bool use_consensus_ = false; bool use_remote_fencing_ = false; const std::string kTestKeyname = "opensaf_write_test"; const std::chrono::milliseconds kSleepInterval = - std::chrono::milliseconds(100); // in ms + std::chrono::milliseconds(500); // in ms static constexpr uint32_t kLockTimeout = 0; // lock is persistent by default - static constexpr uint32_t kMaxRetry = 600; + static constexpr uint32_t kMaxTakeoverRetry = 20; + static constexpr uint32_t kMaxRetry = 60; + static constexpr uint32_t kTakeoverValidTime = 20; // in seconds + + void CheckForExistingTakeoverRequest(); + + SaAisErrorT CreateTakeoverRequest(const std::string& current_owner, + const std::string& proposed_owner, + const uint64_t cluster_size); + + SaAisErrorT ReadTakeoverRequest(std::vector<std::string>& tokens); + + SaAisErrorT WriteTakeoverResult(const std::string& timestamp, + const std::string& current_owner, + const std::string& proposed_owner, + const std::string& proposed_cluster_size, + const TakeoverState result); + SaAisErrorT Demote(const std::string& node); bool FenceNode(const std::string& node); + void Split(const std::string& str, std::vector<std::string>& tokens) const; + uint64_t CurrentTime() const; + DELETE_COPY_AND_MOVE_OPERATORS(Consensus); }; diff --git a/src/osaf/consensus/key_value.cc b/src/osaf/consensus/key_value.cc index dbf07520b..80950e7cb 100644 --- a/src/osaf/consensus/key_value.cc +++ b/src/osaf/consensus/key_value.cc @@ -45,9 +45,9 @@ int KeyValue::Execute(const std::string& command, std::string& output) { SaAisErrorT KeyValue::Get(const std::string& key, std::string& value) { TRACE_ENTER(); - const std::string kv_store_cmd = base::GetEnv( - "FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); - const std::string command(kv_store_cmd + " get " + key); + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string command(kv_store_cmd + " get \"" + key + "\""); int rc = KeyValue::Execute(command, value); TRACE("Read '%s'", value.c_str()); @@ -61,9 +61,10 @@ SaAisErrorT KeyValue::Get(const std::string& key, std::string& value) { SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value) { TRACE_ENTER(); - const std::string kv_store_cmd = base::GetEnv( - "FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); - const std::string command(kv_store_cmd + " set " + key + " " + value); + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string command(kv_store_cmd + " set \"" + key + "\" \"" + value + + "\""); std::string output; int rc = KeyValue::Execute(command, output); @@ -74,12 +75,47 @@ SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value) { } } +SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value, + const std::string& prev_value) { + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string command(kv_store_cmd + " set_if_prev \"" + key + "\" \"" + + value + "\" \"" + prev_value + "\""); + std::string output; + int rc = KeyValue::Execute(command, output); + + if (rc == 0) { + return SA_AIS_OK; + } else { + return SA_AIS_ERR_FAILED_OPERATION; + } +} + +SaAisErrorT KeyValue::Create(const std::string& key, const std::string& value) { + TRACE_ENTER(); + + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string command(kv_store_cmd + " create \"" + key + "\" \"" + + value + "\""); + std::string output; + int rc = KeyValue::Execute(command, output); + + if (rc == 0) { + return SA_AIS_OK; + } else if (rc == 1) { + return SA_AIS_ERR_EXIST; + } else { + return SA_AIS_ERR_FAILED_OPERATION; + } +} + SaAisErrorT KeyValue::Erase(const std::string& key) { TRACE_ENTER(); - const std::string kv_store_cmd = base::GetEnv( - "FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); - const std::string command(kv_store_cmd + " erase " + key); + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string command(kv_store_cmd + " erase \"" + key + "\""); std::string output; int rc = KeyValue::Execute(command, output); @@ -91,13 +127,13 @@ SaAisErrorT KeyValue::Erase(const std::string& key) { } SaAisErrorT KeyValue::Lock(const std::string& owner, - const unsigned int timeout) { + const unsigned int timeout) { TRACE_ENTER(); - const std::string kv_store_cmd = base::GetEnv( - "FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); - const std::string command(kv_store_cmd + " lock " + owner + " " + - std::to_string(timeout)); + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string command(kv_store_cmd + " lock \"" + owner + "\" " + + std::to_string(timeout)); std::string output; int rc = KeyValue::Execute(command, output); @@ -105,8 +141,10 @@ SaAisErrorT KeyValue::Lock(const std::string& owner, return SA_AIS_OK; } else if (rc == 1) { // already locked + LOG_NO("Locked failed: %s", output.c_str()); return SA_AIS_ERR_EXIST; } else { + LOG_NO("Locked failed: %s", output.c_str()); return SA_AIS_ERR_TRY_AGAIN; } } @@ -114,15 +152,16 @@ SaAisErrorT KeyValue::Lock(const std::string& owner, SaAisErrorT KeyValue::Unlock(const std::string& owner) { TRACE_ENTER(); - const std::string kv_store_cmd = base::GetEnv( - "FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); - const std::string command(kv_store_cmd + " unlock " + owner); + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string command(kv_store_cmd + " unlock \"" + owner + "\""); std::string output; int rc = Execute(command, output); if (rc == 0) { return SA_AIS_OK; } else if (rc == 1) { + LOG_NO("Unlock failed: %s", output.c_str()); LOG_ER("Lock is owned by another node"); return SA_AIS_ERR_INVALID_PARAM; } else { @@ -133,8 +172,8 @@ SaAisErrorT KeyValue::Unlock(const std::string& owner) { SaAisErrorT KeyValue::LockOwner(std::string& owner) { TRACE_ENTER(); - const std::string kv_store_cmd = base::GetEnv( - "FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); const std::string command(kv_store_cmd + " lock_owner"); std::string output; int rc = KeyValue::Execute(command, output); @@ -145,23 +184,24 @@ SaAisErrorT KeyValue::LockOwner(std::string& owner) { return SA_AIS_OK; } + // put output in owner, for debugging purposes + owner = output; return SA_AIS_ERR_FAILED_OPERATION; } namespace { static constexpr std::chrono::milliseconds kSleepInterval = - std::chrono::milliseconds(100); // in ms + std::chrono::milliseconds(100); // in ms static constexpr uint32_t kMaxRetry = 100; -void WatchKeyFunction(const std::string& key, - const ConsensusCallback& callback, - const uint32_t user_defined) { +void WatchKeyFunction(const std::string& key, const ConsensusCallback& callback, + const uint32_t user_defined) { TRACE_ENTER(); - const std::string kv_store_cmd = base::GetEnv( - "FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); - const std::string command(kv_store_cmd + " watch " + key); + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string command(kv_store_cmd + " watch \"" + key + "\""); std::string value; uint32_t retries = 0; int rc; @@ -183,11 +223,11 @@ void WatchKeyFunction(const std::string& key, } void WatchLockFunction(const ConsensusCallback& callback, - const uint32_t user_defined) { + const uint32_t user_defined) { TRACE_ENTER(); - const std::string kv_store_cmd = base::GetEnv( - "FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); + const std::string kv_store_cmd = + base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", ""); const std::string command(kv_store_cmd + " watch_lock"); std::string value; uint32_t retries = 0; @@ -211,16 +251,15 @@ void WatchLockFunction(const ConsensusCallback& callback, } // namespace -void KeyValue::Watch(const std::string& key, - const ConsensusCallback callback, - const uint32_t user_defined) { +void KeyValue::Watch(const std::string& key, const ConsensusCallback callback, + const uint32_t user_defined) { std::thread t(WatchKeyFunction, key, callback, user_defined); t.detach(); return; } void KeyValue::WatchLock(const ConsensusCallback callback, - const uint32_t user_defined) { + const uint32_t user_defined) { std::thread t(WatchLockFunction, callback, user_defined); t.detach(); return; diff --git a/src/osaf/consensus/key_value.h b/src/osaf/consensus/key_value.h index 853f303ff..b003629c0 100644 --- a/src/osaf/consensus/key_value.h +++ b/src/osaf/consensus/key_value.h @@ -20,9 +20,9 @@ #include <thread> #include "saAis.h" -typedef std::function<void(const std::string& key, - const std::string& new_value, - const uint32_t user_defined)> ConsensusCallback; +typedef std::function<void(const std::string& key, const std::string& new_value, + const uint32_t user_defined)> + ConsensusCallback; class KeyValue { public: @@ -32,13 +32,20 @@ class KeyValue { // Set key to value static SaAisErrorT Set(const std::string& key, const std::string& value); + // Set key to value only if prev value matches + static SaAisErrorT Set(const std::string& key, const std::string& value, + const std::string& prev_value); + + // Create key, and set to value. Fails if key already exists. + static SaAisErrorT Create(const std::string& key, const std::string& value); + // Erase key static SaAisErrorT Erase(const std::string& key); // Obtain lock, default timeout is 0 seconds (indefinite). If lock // is called when already locked, the timeout is extended static SaAisErrorT Lock(const std::string& owner, - const unsigned int timeout = 0); + const unsigned int timeout = 0); // Release lock static SaAisErrorT Unlock(const std::string& owner); @@ -48,11 +55,11 @@ class KeyValue { // starts a thread to watch key and call callback if values changes static void Watch(const std::string& key, ConsensusCallback callback, - const uint32_t user_defined); + const uint32_t user_defined); // starts a thread to watch the lock and call callback if is modified static void WatchLock(ConsensusCallback callback, - const uint32_t user_defined); + const uint32_t user_defined); // internal use static int Execute(const std::string& command, std::string& output); -- 2.14.1 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel