KUDU-2274. Shut down tombstoned replica when replacing it Failing to shut down a tombstoned replica after copying it can lead to unfortunate interleavings resulting in the replica ending up in an inconsistent state. This actually occurred in a test environment, although it proved very hard to reproduce.
This patch includes several changes in addition to shutting down tombstoned replicas before replacing them: * Remove the thread safety properties of the ConsensusMetadata class ConsensusMetadata doesn't need to be thread-safe, even though it is ref-counted, because it is required to be externally synchronized. This patch replaces the mutex with a DFAKE_MUTEX from the thread collision warner utility class in order to easily detect concurrent access due to buggy external sychronization. * Also improve destructor state checks in TabletReplica. * Fix another case of unlocked cmeta access by TSTabletManager. These fixes were verified by running tombstoned_voting-stress-test with 4 CPU stress threads on the dist-test cluster after applying only the ConsensusMetadata thread-safety portion of this patch, and then again with the unlocked access fix and shutdown portions of this patch. After removing the cmeta mutex only (186/200 failed): http://dist-test.cloudera.org/job?job_id=mpercy.1518077234.135005 This full patch (200/200 succeeded): http://dist-test.cloudera.org/job?job_id=mpercy.1518078690.66599 Change-Id: Ia8d086c3fba52826ebe0d3a44842d53ecb6a9265 Reviewed-on: http://gerrit.cloudera.org:8080/9246 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <aser...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/17f97531 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/17f97531 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/17f97531 Branch: refs/heads/master Commit: 17f97531e2e333836c55c8df0bbe27974005b4a6 Parents: d977d1c Author: Mike Percy <mpe...@apache.org> Authored: Tue Feb 6 19:05:41 2018 -0800 Committer: Mike Percy <mpe...@apache.org> Committed: Mon Feb 12 23:29:31 2018 +0000 ---------------------------------------------------------------------- src/kudu/consensus/consensus_meta.cc | 186 ++++++++++------------------- src/kudu/consensus/consensus_meta.h | 40 ++----- src/kudu/consensus/raft_consensus.cc | 4 +- src/kudu/consensus/raft_consensus.h | 2 +- src/kudu/tablet/tablet_replica.cc | 5 +- src/kudu/tserver/ts_tablet_manager.cc | 5 +- 6 files changed, 87 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/17f97531/src/kudu/consensus/consensus_meta.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_meta.cc b/src/kudu/consensus/consensus_meta.cc index f112f94..8e6a6a6 100644 --- a/src/kudu/consensus/consensus_meta.cc +++ b/src/kudu/consensus/consensus_meta.cc @@ -53,216 +53,165 @@ using std::string; using strings::Substitute; int64_t ConsensusMetadata::current_term() const { - lock_guard<Mutex> l(lock_); - return current_term_unlocked(); -} - -int64_t ConsensusMetadata::current_term_unlocked() const { - lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); DCHECK(pb_.has_current_term()); return pb_.current_term(); } void ConsensusMetadata::set_current_term(int64_t term) { - lock_guard<Mutex> l(lock_); - set_current_term_unlocked(term); -} - -void ConsensusMetadata::set_current_term_unlocked(int64_t term) { - lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); DCHECK_GE(term, kMinimumTerm); pb_.set_current_term(term); } bool ConsensusMetadata::has_voted_for() const { - lock_guard<Mutex> l(lock_); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); return pb_.has_voted_for(); } -string ConsensusMetadata::voted_for() const { - lock_guard<Mutex> l(lock_); +const string& ConsensusMetadata::voted_for() const { + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); DCHECK(pb_.has_voted_for()); return pb_.voted_for(); } void ConsensusMetadata::clear_voted_for() { - lock_guard<Mutex> l(lock_); - clear_voted_for_unlocked(); -} - -void ConsensusMetadata::clear_voted_for_unlocked() { - lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); pb_.clear_voted_for(); } void ConsensusMetadata::set_voted_for(const string& uuid) { - lock_guard<Mutex> l(lock_); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); DCHECK(!uuid.empty()); pb_.set_voted_for(uuid); } bool ConsensusMetadata::IsVoterInConfig(const string& uuid, RaftConfigState type) { - lock_guard<Mutex> l(lock_); - return IsRaftConfigVoter(uuid, config_unlocked(type)); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + return IsRaftConfigVoter(uuid, GetConfig(type)); } bool ConsensusMetadata::IsMemberInConfig(const string& uuid, RaftConfigState type) { - lock_guard<Mutex> l(lock_); - return IsRaftConfigMember(uuid, config_unlocked(type)); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + return IsRaftConfigMember(uuid, GetConfig(type)); } int ConsensusMetadata::CountVotersInConfig(RaftConfigState type) { - lock_guard<Mutex> l(lock_); - return CountVoters(config_unlocked(type)); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + return CountVoters(GetConfig(type)); } int64_t ConsensusMetadata::GetConfigOpIdIndex(RaftConfigState type) { - lock_guard<Mutex> l(lock_); - return config_unlocked(type).opid_index(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + return GetConfig(type).opid_index(); } -RaftConfigPB ConsensusMetadata::CommittedConfig() const { - lock_guard<Mutex> l(lock_); - return committed_config_unlocked(); +const RaftConfigPB& ConsensusMetadata::CommittedConfig() const { + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + return GetConfig(COMMITTED_CONFIG); } -const RaftConfigPB& ConsensusMetadata::config_unlocked(RaftConfigState type) const { +const RaftConfigPB& ConsensusMetadata::GetConfig(RaftConfigState type) const { switch (type) { - case ACTIVE_CONFIG: return active_config_unlocked(); - case COMMITTED_CONFIG: return committed_config_unlocked(); - case PENDING_CONFIG: return pending_config_unlocked(); + case ACTIVE_CONFIG: + if (has_pending_config_) { + return pending_config_; + } + DCHECK(pb_.has_committed_config()); + return pb_.committed_config(); + case COMMITTED_CONFIG: + DCHECK(pb_.has_committed_config()); + return pb_.committed_config(); + case PENDING_CONFIG: + CHECK(has_pending_config_) << LogPrefix() << "There is no pending config"; + return pending_config_; default: LOG(FATAL) << "Unknown RaftConfigState type: " << type; } } -const RaftConfigPB& ConsensusMetadata::committed_config_unlocked() const { - lock_.AssertAcquired(); - DCHECK(pb_.has_committed_config()); - return pb_.committed_config(); -} - void ConsensusMetadata::set_committed_config(const RaftConfigPB& config) { - lock_guard<Mutex> l(lock_); - set_committed_config_unlocked(config); -} - -void ConsensusMetadata::set_committed_config_unlocked(const RaftConfigPB& config) { - lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); *pb_.mutable_committed_config() = config; if (!has_pending_config_) { - UpdateActiveRoleUnlocked(); + UpdateActiveRole(); } } bool ConsensusMetadata::has_pending_config() const { - lock_guard<Mutex> l(lock_); - return has_pending_config_unlocked(); -} - -bool ConsensusMetadata::has_pending_config_unlocked() const { - lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); return has_pending_config_; } -RaftConfigPB ConsensusMetadata::PendingConfig() const { - lock_guard<Mutex> l(lock_); - return pending_config_unlocked(); -} - -const RaftConfigPB& ConsensusMetadata::pending_config_unlocked() const { - lock_.AssertAcquired(); - CHECK(has_pending_config_) << LogPrefix() << "There is no pending config"; - return pending_config_; +const RaftConfigPB& ConsensusMetadata::PendingConfig() const { + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + return GetConfig(PENDING_CONFIG);; } void ConsensusMetadata::clear_pending_config() { - lock_guard<Mutex> l(lock_); - clear_pending_config_unlocked(); -} - -void ConsensusMetadata::clear_pending_config_unlocked() { - lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); has_pending_config_ = false; pending_config_.Clear(); - UpdateActiveRoleUnlocked(); + UpdateActiveRole(); } void ConsensusMetadata::set_pending_config(const RaftConfigPB& config) { - lock_guard<Mutex> l(lock_); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); has_pending_config_ = true; pending_config_ = config; - UpdateActiveRoleUnlocked(); + UpdateActiveRole(); } -RaftConfigPB ConsensusMetadata::ActiveConfig() const { - lock_guard<Mutex> l(lock_); - return active_config_unlocked(); +const RaftConfigPB& ConsensusMetadata::ActiveConfig() const { + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + return GetConfig(ACTIVE_CONFIG); } -const RaftConfigPB& ConsensusMetadata::active_config_unlocked() const { - lock_.AssertAcquired(); - if (has_pending_config_) { - return pending_config_unlocked(); - } - return committed_config_unlocked(); -} - -string ConsensusMetadata::leader_uuid() const { - lock_guard<Mutex> l(lock_); +const string& ConsensusMetadata::leader_uuid() const { + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); return leader_uuid_; } -void ConsensusMetadata::set_leader_uuid(const string& uuid) { - lock_guard<Mutex> l(lock_); - set_leader_uuid_unlocked(uuid); -} - -void ConsensusMetadata::set_leader_uuid_unlocked(const string& uuid) { - lock_.AssertAcquired(); - leader_uuid_ = uuid; - UpdateActiveRoleUnlocked(); +void ConsensusMetadata::set_leader_uuid(string uuid) { + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + leader_uuid_ = std::move(uuid); + UpdateActiveRole(); } RaftPeerPB::Role ConsensusMetadata::active_role() const { - lock_guard<Mutex> l(lock_); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); return active_role_; } ConsensusStatePB ConsensusMetadata::ToConsensusStatePB() const { - lock_guard<Mutex> l(lock_); - return ToConsensusStatePBUnlocked(); -} - -ConsensusStatePB ConsensusMetadata::ToConsensusStatePBUnlocked() const { - lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); ConsensusStatePB cstate; cstate.set_current_term(pb_.current_term()); if (!leader_uuid_.empty()) { cstate.set_leader_uuid(leader_uuid_); } - *cstate.mutable_committed_config() = committed_config_unlocked(); - if (has_pending_config_unlocked()) { - *cstate.mutable_pending_config() = pending_config_unlocked(); + *cstate.mutable_committed_config() = CommittedConfig(); + if (has_pending_config_) { + *cstate.mutable_pending_config() = pending_config_; } return cstate; } void ConsensusMetadata::MergeCommittedConsensusStatePB(const ConsensusStatePB& cstate) { - lock_guard<Mutex> l(lock_); - if (cstate.current_term() > current_term_unlocked()) { - set_current_term_unlocked(cstate.current_term()); - clear_voted_for_unlocked(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + if (cstate.current_term() > current_term()) { + set_current_term(cstate.current_term()); + clear_voted_for(); } - set_leader_uuid_unlocked(""); - set_committed_config_unlocked(cstate.committed_config()); - clear_pending_config_unlocked(); + set_leader_uuid(""); + set_committed_config(cstate.committed_config()); + clear_pending_config(); } Status ConsensusMetadata::Flush(FlushMode flush_mode) { - lock_guard<Mutex> l(lock_); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); MAYBE_FAULT(FLAGS_fault_crash_before_cmeta_flush); SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500, LogPrefix(), "flushing consensus metadata"); @@ -364,13 +313,8 @@ std::string ConsensusMetadata::LogPrefix() const { } void ConsensusMetadata::UpdateActiveRole() { - lock_guard<Mutex> l(lock_); - UpdateActiveRoleUnlocked(); -} - -void ConsensusMetadata::UpdateActiveRoleUnlocked() { - lock_.AssertAcquired(); - ConsensusStatePB cstate = ToConsensusStatePBUnlocked(); + DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_); + ConsensusStatePB cstate = ToConsensusStatePB(); active_role_ = GetConsensusRole(peer_uuid_, cstate); VLOG_WITH_PREFIX(1) << "Updating active role to " << RaftPeerPB::Role_Name(active_role_) << ". Consensus state: " << pb_util::SecureShortDebugString(cstate); http://git-wip-us.apache.org/repos/asf/kudu/blob/17f97531/src/kudu/consensus/consensus_meta.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_meta.h b/src/kudu/consensus/consensus_meta.h index e30eb85..4766575 100644 --- a/src/kudu/consensus/consensus_meta.h +++ b/src/kudu/consensus/consensus_meta.h @@ -26,7 +26,7 @@ #include "kudu/consensus/quorum_util.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" -#include "kudu/util/mutex.h" +#include "kudu/gutil/threading/thread_collision_warner.h" namespace kudu { @@ -65,7 +65,7 @@ enum class ConsensusMetadataCreateMode { // the pending configuration if a pending configuration is set, otherwise the committed // configuration. // -// This class is thread-safe. +// This class is not thread-safe and requires external synchronization. class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> { public: @@ -81,7 +81,7 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> { // Accessors for voted_for. bool has_voted_for() const; - std::string voted_for() const; + const std::string& voted_for() const; void clear_voted_for(); void set_voted_for(const std::string& uuid); @@ -101,14 +101,14 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> { int64_t GetConfigOpIdIndex(RaftConfigState type); // Accessors for committed configuration. - RaftConfigPB CommittedConfig() const; + const RaftConfigPB& CommittedConfig() const; void set_committed_config(const RaftConfigPB& config); // Returns whether a pending configuration is set. bool has_pending_config() const; // Returns the pending configuration if one is set. Otherwise, fires a DCHECK. - RaftConfigPB PendingConfig() const; + const RaftConfigPB& PendingConfig() const; // Set & clear the pending configuration. void clear_pending_config(); @@ -116,12 +116,11 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> { // If a pending configuration is set, return it. // Otherwise, return the committed configuration. - RaftConfigPB ActiveConfig() const; - const RaftConfigPB& active_config_unlocked() const; + const RaftConfigPB& ActiveConfig() const; // Accessors for setting the active leader. - std::string leader_uuid() const; - void set_leader_uuid(const std::string& uuid); + const std::string& leader_uuid() const; + void set_leader_uuid(std::string uuid); // Returns the currently active role of the current node. RaftPeerPB::Role active_role() const; @@ -135,7 +134,7 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> { ConsensusStatePB ToConsensusStatePB() const; // Merge the committed portion of the consensus state from the source node - // during remote bootstrap. + // during tablet copy. // // This method will clear any pending config change, replace the committed // consensus config with the one in 'cstate', and clear the currently @@ -205,26 +204,12 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> { static Status DeleteOnDiskData(FsManager* fs_manager, const std::string& tablet_id); // Return the specified config. - const RaftConfigPB& config_unlocked(RaftConfigState type) const; - - const RaftConfigPB& committed_config_unlocked() const; - void set_committed_config_unlocked(const RaftConfigPB& config); - - int64_t current_term_unlocked() const; - void set_current_term_unlocked(int64_t term); - void clear_voted_for_unlocked(); - const RaftConfigPB& pending_config_unlocked() const; - bool has_pending_config_unlocked() const; - void clear_pending_config_unlocked(); - void set_leader_uuid_unlocked(const std::string& uuid); - - ConsensusStatePB ToConsensusStatePBUnlocked() const; + const RaftConfigPB& GetConfig(RaftConfigState type) const; std::string LogPrefix() const; // Updates the cached active role. void UpdateActiveRole(); - void UpdateActiveRoleUnlocked(); // Updates the cached on-disk size of the consensus metadata. Status UpdateOnDiskSize(); @@ -233,8 +218,9 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> { const std::string tablet_id_; const std::string peer_uuid_; - // Protects all of the mutable fields below. - mutable Mutex lock_; + // This fake mutex helps ensure that this ConsensusMetadata object stays + // externally synchronized. + DFAKE_MUTEX(fake_lock_); std::string leader_uuid_; // Leader of the current term (term == pb_.current_term). bool has_pending_config_; // Indicates whether there is an as-yet uncommitted http://git-wip-us.apache.org/repos/asf/kudu/blob/17f97531/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 7a2017a..6a0d9c1 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -2309,7 +2309,7 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked( Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() { DCHECK(lock_.is_locked()); DCHECK_EQ(RaftPeerPB::LEADER, cmeta_->active_role()); - RaftConfigPB active_config = cmeta_->ActiveConfig(); + const RaftConfigPB& active_config = cmeta_->ActiveConfig(); // Change the peers so that we're able to replicate messages remotely and // locally. The peer manager must be closed before updating the active config @@ -2914,7 +2914,7 @@ Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string& uuid) { return Status::OK(); } -std::string RaftConsensus::GetVotedForCurrentTermUnlocked() const { +const std::string& RaftConsensus::GetVotedForCurrentTermUnlocked() const { DCHECK(lock_.is_locked()); DCHECK(cmeta_->has_voted_for()); return cmeta_->voted_for(); http://git-wip-us.apache.org/repos/asf/kudu/blob/17f97531/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index f77749f..cfc38c5 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -744,7 +744,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>, // Return replica's vote for the current term. // The vote must be set; use HasVotedCurrentTermUnlocked() to check. - std::string GetVotedForCurrentTermUnlocked() const; + const std::string& GetVotedForCurrentTermUnlocked() const; const ConsensusOptions& GetOptions() const; http://git-wip-us.apache.org/repos/asf/kudu/blob/17f97531/src/kudu/tablet/tablet_replica.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc index 6f74bdd..2fbd7ae 100644 --- a/src/kudu/tablet/tablet_replica.cc +++ b/src/kudu/tablet/tablet_replica.cc @@ -132,9 +132,8 @@ TabletReplica::TabletReplica( } TabletReplica::~TabletReplica() { - // We should either have called Shutdown(), or we should have never called - // Init(). - CHECK(!tablet_) + // We are required to call Shutdown() before destroying a TabletReplica. + CHECK(state_ == SHUTDOWN || state_ == FAILED) << "TabletReplica not fully shut down. State: " << TabletStatePB_Name(state_); } http://git-wip-us.apache.org/repos/asf/kudu/blob/17f97531/src/kudu/tserver/ts_tablet_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc index 8325d67..1c10995 100644 --- a/src/kudu/tserver/ts_tablet_manager.cc +++ b/src/kudu/tserver/ts_tablet_manager.cc @@ -570,6 +570,9 @@ void TSTabletManager::RunTabletCopy( last_logged_opid->term()), TabletServerErrorPB::INVALID_CONFIG); } + // Shut down the old TabletReplica so that it is no longer allowed to + // mutate the ConsensusMetadata. + old_replica->Shutdown(); break; } case TABLET_DATA_READY: { @@ -959,7 +962,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletReplica>& replica, // with a partially created tablet here? replica->SetBootstrapping(); s = BootstrapTablet(replica->tablet_metadata(), - cmeta->CommittedConfig(), + replica->consensus()->CommittedConfig(), scoped_refptr<clock::Clock>(server_->clock()), server_->mem_tracker(), server_->result_tracker(),