Repository: kudu Updated Branches: refs/heads/master 2c5c9d06b -> 34bb7f93b
[rebalancer] location-aware rebalancer (part 1/n) Added logic to identify and fix placement policy violations. Also, added units tests to cover the new functionality. Change-Id: I1cdc3b57a782e6d043b1853600c97fe1f8630347 Reviewed-on: http://gerrit.cloudera.org:8080/11549 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley <wdberke...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/81d0109b Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/81d0109b Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/81d0109b Branch: refs/heads/master Commit: 81d0109b461fc29c427c1fc81598c286318a95b9 Parents: 2c5c9d0 Author: Alexey Serbin <aser...@cloudera.com> Authored: Fri Sep 21 15:38:40 2018 -0700 Committer: Alexey Serbin <aser...@cloudera.com> Committed: Mon Oct 29 23:05:24 2018 +0000 ---------------------------------------------------------------------- src/kudu/tools/CMakeLists.txt | 2 + src/kudu/tools/ksck_results.h | 1 + src/kudu/tools/placement_policy_util-test.cc | 494 ++++++++++++++++++++++ src/kudu/tools/placement_policy_util.cc | 357 ++++++++++++++++ src/kudu/tools/placement_policy_util.h | 117 +++++ src/kudu/tools/rebalance_algo.h | 18 +- src/kudu/tools/rebalancer.h | 17 +- 7 files changed, 1001 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt index c13fa65..6e3ad21 100644 --- a/src/kudu/tools/CMakeLists.txt +++ b/src/kudu/tools/CMakeLists.txt @@ -89,6 +89,7 @@ target_link_libraries(ksck add_library(kudu_tools_rebalance rebalancer.cc rebalance_algo.cc + placement_policy_util.cc tool_replica_util.cc ) target_link_libraries(kudu_tools_rebalance @@ -179,6 +180,7 @@ ADD_KUDU_TEST_DEPENDENCIES(kudu-tool-test ADD_KUDU_TEST(kudu-ts-cli-test) ADD_KUDU_TEST_DEPENDENCIES(kudu-ts-cli-test kudu) +ADD_KUDU_TEST(placement_policy_util-test) ADD_KUDU_TEST(rebalance-test) ADD_KUDU_TEST(rebalance_algo-test) ADD_KUDU_TEST(rebalancer_tool-test http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/ksck_results.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck_results.h b/src/kudu/tools/ksck_results.h index b9cad6c..21a3112 100644 --- a/src/kudu/tools/ksck_results.h +++ b/src/kudu/tools/ksck_results.h @@ -142,6 +142,7 @@ int ServerHealthScore(KsckServerHealth sh); struct KsckServerHealthSummary { std::string uuid; std::string address; + std::string ts_location; boost::optional<std::string> version; KsckServerHealth health = KsckServerHealth::HEALTHY; Status status = Status::OK(); http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/placement_policy_util-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/placement_policy_util-test.cc b/src/kudu/tools/placement_policy_util-test.cc new file mode 100644 index 0000000..68266f3 --- /dev/null +++ b/src/kudu/tools/placement_policy_util-test.cc @@ -0,0 +1,494 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/tools/placement_policy_util.h" + +#include <cstdint> +#include <iostream> +#include <map> +#include <set> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include <boost/optional/optional.hpp> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/tools/rebalance_algo.h" +#include "kudu/tools/rebalancer.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" + +using std::map; +using std::ostream; +using std::ostringstream; +using std::set; +using std::string; +using std::unordered_map; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace tools { + +// TODO(aserbin): consider renaming the structures below XxxInfo --> Xxx + +// Information on a table. +struct TestTableInfo { + const string id; + const int replication_factor; + const vector<string> tablet_ids; +}; + +// Information on tablet servers in the same location. +struct TabletServerLocationInfo { + const string location; + const vector<string> ts_ids; +}; + +// Information on tablet replicas hosted by a tablet server. +struct TabletServerReplicasInfo { + const string ts_id; + const vector<string> tablet_ids; +}; + +// Describes a cluster with emphasis on the placement policy constraints. +struct TestClusterConfig { + // The input information on the test cluster. + const vector<TestTableInfo> tables_info; + const vector<TabletServerLocationInfo> ts_location_info; + const vector<TabletServerReplicasInfo> replicas_info; + + // The expected information on placement policy violations and moves + // to correct those. + const vector<PlacementPolicyViolationInfo> reference_violations_info; + const vector<Rebalancer::ReplicaMove> reference_replicas_to_remove; +}; + +// Transform the definition of the test cluster into the ClusterLocalityInfo +// and TabletsPlacementInfo. +void ClusterConfigToClusterPlacementInfo(const TestClusterConfig& tcc, + ClusterLocalityInfo* cli, + TabletsPlacementInfo* tpi) { + // TODO(aserbin): add sanity checks on the results + ClusterLocalityInfo result_cli; + TabletsPlacementInfo result_tpi; + + for (const auto& table_info : tcc.tables_info) { + TableInfo info; + info.name = table_info.id; + info.replication_factor = table_info.replication_factor; + EmplaceOrDie(&result_tpi.tables_info, table_info.id, std::move(info)); + for (const auto& tablet_id : table_info.tablet_ids) { + EmplaceOrDie(&result_tpi.tablet_to_table_id, tablet_id, table_info.id); + } + } + + for (const auto& location_info : tcc.ts_location_info) { + const auto& location = location_info.location; + // Populate ts_uuids_by_location. + auto& ts_uuids = LookupOrEmplace(&result_cli.servers_by_location, + location, set<string>()); + ts_uuids.insert(location_info.ts_ids.begin(), location_info.ts_ids.end()); + for (const auto& ts_id : ts_uuids) { + EmplaceOrDie(&result_cli.location_by_ts_id, ts_id, location); + } + } + + for (const auto& replica_info : tcc.replicas_info) { + const auto& ts_location = FindOrDie(result_cli.location_by_ts_id, + replica_info.ts_id); + // Populate replica_num_by_ts_id. + EmplaceOrDie(&result_tpi.replica_num_by_ts_id, + replica_info.ts_id, replica_info.tablet_ids.size()); + for (const auto& tablet_id : replica_info.tablet_ids) { + // Populate tablets_info. + auto& tablet_info = LookupOrEmplace(&result_tpi.tablets_info, + tablet_id, TabletInfo()); + tablet_info.config_idx = 0; // hard-coded for this type of test + auto& ri = tablet_info.replicas_info; + // For these tests, the first tablet in the list is assigned leader role. + ri.push_back(TabletReplicaInfo{ + replica_info.ts_id, + ri.empty() ? ReplicaRole::LEADER : ReplicaRole::FOLLOWER_VOTER }); + // Populate tablet_location_info. + auto& count_by_location = LookupOrEmplace( + &result_tpi.tablet_location_info, tablet_id, unordered_map<string, int>()); + ++LookupOrEmplace(&count_by_location, ts_location, 0); + } + } + *cli = std::move(result_cli); + *tpi = std::move(result_tpi); +} + +// TODO(aserbin): is it needed at all? +bool operator==(const PlacementPolicyViolationInfo& lhs, + const PlacementPolicyViolationInfo& rhs) { + return lhs.tablet_id == rhs.tablet_id && + lhs.majority_location == rhs.majority_location && + lhs.replicas_num_at_majority_location == + rhs.replicas_num_at_majority_location && + lhs.replication_factor == rhs.replication_factor; +} + +ostream& operator<<(ostream& s, const PlacementPolicyViolationInfo& info) { + s << "{tablet_id: " << info.tablet_id + << ", location: " << info.majority_location << "}"; + return s; +} + +bool operator==(const Rebalancer::ReplicaMove& lhs, + const Rebalancer::ReplicaMove& rhs) { + CHECK(lhs.ts_uuid_to.empty()); + CHECK(rhs.ts_uuid_to.empty()); + // The config_opid_idx field is ingored in tests for brevity. + return lhs.tablet_uuid == rhs.tablet_uuid && + lhs.ts_uuid_from == rhs.ts_uuid_from; +} + +ostream& operator<<(ostream& s, const Rebalancer::ReplicaMove& info) { + CHECK(info.ts_uuid_to.empty()); + s << "{tablet_id: " << info.tablet_uuid + << ", ts_id: " << info.ts_uuid_from << "}"; + return s; +} + +// The order of elements in the container of reported violations +// and the container of replica movements to fix the latter doesn't +// matter: they are independent by definition (because they are reported +// in per-tablet way) and either container must not have multiple entries +// per tablet anyway. For the ease of comparison with reference results, +// let's build ordered map out of those, where the key is tablet id +// and the value is the information on the violation or candidate replica +// movement. +typedef map<string, PlacementPolicyViolationInfo> PPVIMap; +typedef map<string, Rebalancer::ReplicaMove> ReplicaMovesMap; + +void ViolationInfoVectorToMap( + const vector<PlacementPolicyViolationInfo>& infos, + PPVIMap* result) { + PPVIMap ret; + for (const auto& info : infos) { + ASSERT_TRUE(EmplaceIfNotPresent(&ret, info.tablet_id, info)); + } + *result = std::move(ret); +} + +void ReplicaMoveVectorToMap( + const vector<Rebalancer::ReplicaMove>& infos, + ReplicaMovesMap* result) { + ReplicaMovesMap ret; + for (const auto& info : infos) { + ASSERT_TRUE(EmplaceIfNotPresent(&ret, info.tablet_uuid, info)); + } + *result = std::move(ret); +} + +void CheckEqual(const vector<PlacementPolicyViolationInfo>& lhs, + const vector<PlacementPolicyViolationInfo>& rhs) { + PPVIMap lhs_map; + NO_FATALS(ViolationInfoVectorToMap(lhs, &lhs_map)); + PPVIMap rhs_map; + NO_FATALS(ViolationInfoVectorToMap(rhs, &rhs_map)); + ASSERT_EQ(lhs_map, rhs_map); +} + +void CheckEqual(const vector<Rebalancer::ReplicaMove>& lhs, + const vector<Rebalancer::ReplicaMove>& rhs) { + ReplicaMovesMap lhs_map; + NO_FATALS(ReplicaMoveVectorToMap(lhs, &lhs_map)); + ReplicaMovesMap rhs_map; + NO_FATALS(ReplicaMoveVectorToMap(rhs, &rhs_map)); + ASSERT_EQ(lhs_map, rhs_map); +} + +// A shortcut for DetectPlacementPolicyViolations() followed by +// FindMovesToReimposePlacementPolicy(). +Status FindMovesToFixPolicyViolations( + const TabletsPlacementInfo& placement_info, + const ClusterLocalityInfo& locality_info, + vector<PlacementPolicyViolationInfo>* violations_info, + std::vector<Rebalancer::ReplicaMove>* replicas_to_remove) { + DCHECK(replicas_to_remove); + + vector<PlacementPolicyViolationInfo> violations; + RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &violations)); + + if (violations.empty()) { + // Nothing to do: no placement policy violations found. + if (violations_info) { + violations_info->clear(); + } + replicas_to_remove->clear(); + return Status::OK(); + } + RETURN_NOT_OK(FindMovesToReimposePlacementPolicy( + placement_info, locality_info, violations, replicas_to_remove)); + *violations_info = std::move(violations); + + return Status::OK(); +} + +class ClusterLocationTest : public ::testing::Test { + protected: + void RunTest(const vector<TestClusterConfig>& test_configs) { + for (auto idx = 0; idx < test_configs.size(); ++idx) { + SCOPED_TRACE(Substitute("test config index: $0", idx)); + const auto& cfg = test_configs[idx]; + + ClusterLocalityInfo cli; + TabletsPlacementInfo tpi; + ClusterConfigToClusterPlacementInfo(cfg, &cli, &tpi); + + vector<PlacementPolicyViolationInfo> violations; + vector<Rebalancer::ReplicaMove> moves; + ASSERT_OK(FindMovesToFixPolicyViolations(tpi, cli, &violations, &moves)); + + NO_FATALS(CheckEqual(cfg.reference_violations_info, violations)); + NO_FATALS(CheckEqual(cfg.reference_replicas_to_remove, moves)); + } + } +}; + +TEST_F(ClusterLocationTest, PlacementPolicyViolationsNone) { + const vector<TestClusterConfig> configs = { + // Single-replica tablets, all in one location: no violation to report. + { + { + { "T0", 1, { "t0", "t1", "t2", } }, + }, + { + { "L0", { "A", } }, + { "L1", { "B", } }, + { "L2", { "C", } }, + }, + { + { "A", { "t0", "t1", "t2", } }, + { "B", {} }, + { "C", {} }, + }, + }, + + // One RF=3 tablet, one replica per location: no violations to report. + { + { + { "T0", 3, { "t0", } }, + }, + { + { "L0", { "A", } }, + { "L1", { "B", } }, + { "L2", { "C", } }, + }, + { + { "A", { "t0", } }, + { "B", { "t0", } }, + { "C", { "t0", } }, + }, + }, + }; + NO_FATALS(RunTest(configs)); +} + +TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) { + const vector<TestClusterConfig> configs = { + // One RF=3 table with one tablet, all the replicas in one of the three + // locations. In addition, one RF=1 table with one tablet has its single + // replica in the same location. + { + { + { "T0", 3, { "t0", } }, + { "X0", 1, { "x0", } }, + }, + { + { "L0", { "A", "B", "C", } }, + { "L1", { "D", } }, + { "L2", { "E", } }, + }, + { + { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", "x0", } }, + { "D", {} }, + { "E", {} }, + }, + { { "t0", "L0" }, }, + { { "t0", "C" }, } + }, + + // One RF=3 tablet, two locations. + { + { + { "T0", 3, { "t0", } }, + }, + { + { "L0", { "A", "B", } }, + { "L1", { "C", } }, + }, + { + { "A", { "t0", } }, + { "B", { "t0", } }, + { "C", { "t0", } }, + }, + { { "t0", "L0" }, }, + {}, + }, + + // One RF=3 tablet, majority of replicas in one of three locations. + { + { + { "T0", 3, { "t0", } }, + }, + { + { "L0", { "A", "B", } }, + { "L1", { "C", } }, + { "L2", { "D", } }, + }, + { + { "A", { "t0", } }, { "B", { "t0", } }, + { "C", { "t0", } }, + { "D", {} }, + }, + { { "t0", "L0" }, }, + { { "t0", "B" }, } + }, + }; + NO_FATALS(RunTest(configs)); +} + +TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) { + const vector<TestClusterConfig> configs = { + // Two tables: one of RF=3 and another of RF=1. For both two tablets of the + // former, the replica distribution violates the placement policy. + { + { + { "T0", 3, { "t0", "t1", } }, + { "X0", 1, { "x0", "x1", } }, + }, + { + { "L0", { "A", "B", "C", } }, + { "L1", { "D", "E", } }, + { "L2", { "F", } }, + }, + { + { "A", { "t0", } }, { "B", { "t0", "x0", } }, { "C", { "t0", } }, + { "D", { "t1", "x1", } }, { "E", { "t1", } }, + { "F", { "t1", } }, + }, + { { "t0", "L0" }, { "t1", "L1" }, }, + { { "t0", "B" }, { "t1", "E" } } + }, + + // Four RF=3 tablets: the replica placement of two tablets is OK, + // but the placement of the two others' violates the placement policy. + { + { + { "T0", 3, { "t0", "t1", "t2", "t3", } }, + }, + { + { "L0", { "A", "B", "C", } }, + { "L1", { "D", "E", } }, + { "L2", { "F", } }, + }, + { + { "A", { "t0", "t2", } }, { "B", { "t0", "t3", } }, { "C", { "t0", } }, + { "D", { "t1", "t2", } }, { "E", { "t1", "t3", } }, + { "F", { "t1", "t2", "t3", } }, + }, + { { "t0", "L0" }, { "t1", "L1" }, }, + { { "t0", "B" }, { "t1", "E" } } + }, + }; + NO_FATALS(RunTest(configs)); +} + +// That's a scenario to verify how FixPlacementPolicyViolations() works when +// there isn't a candidate replica to move. In general (i.e. in case of RF +// higher than 3), the existence of more than two locations does not guarantee +// there is always a way to distribute tablet replicas across locations +// so that no location has the majority of replicas. +TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) { + const vector<TestClusterConfig> configs = { + // One RF=5 tablet with the distribution of its replica placement violating + // the placement policy. + { + { + { "T0", 5, { "t0", } }, + }, + { + { "L0", { "A", "B", "C", "D", } }, + { "L1", { "E", } }, + { "L2", { "F", } }, + }, + { + // Tablet server D doesn't host any replica of t0. + { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", } }, + { "E", { "t0", } }, + { "F", { "t0", } }, + }, + { { "t0", "L0" }, }, + {}, + }, + // One RF=7 tablet with the distribution of its replica placement violating + // the placement policy. + { + { + { "T0", 7, { "t0", } }, + }, + { + { "L0", { "A", "B", "C", "D", "E", } }, + { "L1", { "F", } }, + { "L2", { "G", } }, + { "L3", { "H", } }, + }, + { + // Tablet server E doesn't host any replica of t0. The idea is to + // verify that FindMovesToReimposePlacementPolicy() does not command + // moving a replica within the same location to E or from F, G, or H + // to F. + { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", } }, + { "D", { "t0", } }, + { "F", { "t0", } }, + { "G", { "t0", } }, + { "H", { "t0", } }, + }, + { { "t0", "L0" }, }, + {}, + }, + }; + for (auto idx = 0; idx < configs.size(); ++idx) { + SCOPED_TRACE(Substitute("test config index: $0", idx)); + const auto& cfg = configs[idx]; + + ClusterLocalityInfo cli; + TabletsPlacementInfo tpi; + ClusterConfigToClusterPlacementInfo(cfg, &cli, &tpi); + + vector<PlacementPolicyViolationInfo> violations; + ASSERT_OK(DetectPlacementPolicyViolations(tpi, &violations)); + NO_FATALS(CheckEqual(cfg.reference_violations_info, violations)); + + vector<Rebalancer::ReplicaMove> moves; + auto s = FindMovesToReimposePlacementPolicy(tpi, cli, violations, &moves); + ASSERT_TRUE(s.IsNotFound()) << s.ToString(); + ASSERT_TRUE(moves.empty()); + } +} + +} // namespace tools +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/placement_policy_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/tools/placement_policy_util.cc new file mode 100644 index 0000000..d6494eb --- /dev/null +++ b/src/kudu/tools/placement_policy_util.cc @@ -0,0 +1,357 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/tools/placement_policy_util.h" + +#include <cstdint> +#include <functional> +#include <map> +#include <ostream> +#include <set> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include <boost/optional/optional.hpp> +#include <glog/logging.h> + +#include "kudu/consensus/quorum_util.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/tools/ksck_results.h" +#include "kudu/tools/rebalance_algo.h" +#include "kudu/util/status.h" + +using std::multimap; +using std::string; +using std::unordered_map; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace tools { + +namespace { + +// Given the information on a placement policy violation for the specified +// tablet, find the best replica to mark with the REPLACE attribute. +Status FindBestReplicaToReplace( + const PlacementPolicyViolationInfo& info, + const ClusterLocalityInfo& locality_info, + const TabletsPlacementInfo& tablets_info, + Rebalancer::ReplicaMove* replica_to_replace) { + DCHECK(replica_to_replace); + + const auto& ts_id_by_location = locality_info.servers_by_location; + // If the cluster has a single location, it's impossible to move any replica + // to other location. + if (ts_id_by_location.size() == 1) { + return Status::ConfigurationError(Substitute( + "cannot change tablet replica distribution to conform with " + "the placement policy constraints: cluster is configured to have " + "only one location '$0'", info.majority_location)); + } + + const auto& tablet_id = info.tablet_id; + + // If a total number of locations is 2, it's impossible to make its replica + // distribution conform with the placement policy constraints. + const auto& table_id = FindOrDie(tablets_info.tablet_to_table_id, tablet_id); + const auto& table_info = FindOrDie(tablets_info.tables_info, table_id); + if (ts_id_by_location.size() == 2 && table_info.replication_factor % 2 == 1) { + return Status::ConfigurationError(Substitute( + "tablet $0 (table name '$1'): replica distribution cannot conform " + "with the placement policy constraints since its replication " + "factor is odd ($2) and there are two locations in the cluster", + tablet_id, table_info.name, table_info.replication_factor)); + } + + const auto& location = info.majority_location; + const auto& per_location_replica_info = FindOrDie( + tablets_info.tablet_location_info, tablet_id); + + bool have_location_to_move_replica = false; + if (per_location_replica_info.size() < + locality_info.servers_by_location.size()) { + // Have at least one extra location without replicas of the tablet. + have_location_to_move_replica = true; + } else { + // Check if there is a location to host an extra replica of the tablet. + for (const auto& elem : per_location_replica_info) { + const auto& loc = elem.first; + const auto loc_replica_count = elem.second; + // Check whether the location can accomodate one more replica while not + // becoming 'the majority' location. + if (loc_replica_count + 1 >= + consensus::MajoritySize(table_info.replication_factor)) { + continue; + } + // The source location is among would-be-majority ones because it hosts + // the majority of replicas already. + CHECK_NE(location, loc) << Substitute("location '$0' must have " + "the majority of replicas", loc); + const auto& servers = FindOrDie(locality_info.servers_by_location, loc); + if (servers.size() > loc_replica_count) { + // There is an extra tablet server to host another replica in this + // location. + have_location_to_move_replica = true; + break; + } + } + } + if (!have_location_to_move_replica) { + return Status::NotFound(Substitute( + "there isn't a single candidate location to move replica of tablet $0 " + "from the majority location '$1'", tablet_id, info.majority_location)); + } + + // Identifiers of the tablet servers that host the candidate replicas to be + // kicked out. + vector<string> ts_id_candidates; + + // The identifier of the leader replica, if it's in the source location. + string ts_id_leader_replica; + + const auto& tablet_info = FindOrDie(tablets_info.tablets_info, tablet_id); + const auto& ts_at_location = FindOrDie(ts_id_by_location, location); + for (const auto& replica_info : tablet_info.replicas_info) { + const auto& ts_id = replica_info.ts_uuid; + if (!ContainsKey(ts_at_location, ts_id)) { + continue; + } + ts_id_candidates.emplace_back(ts_id); + if (replica_info.role == ReplicaRole::LEADER) { + DCHECK(ts_id_leader_replica.empty()); + ts_id_leader_replica = ts_id; + } + } + + CHECK(!ts_id_candidates.empty()) << Substitute( + "no replicas found to remove from location '$0' to fix placement policy " + "violation for tablet $1", location, tablet_id); + + // Build auxiliary map to find most loaded tablet servers. + const auto& replica_num_by_ts_id = tablets_info.replica_num_by_ts_id; + multimap<int32_t, string, std::greater<int32_t>> servers_by_replica_num; + for (const auto& ts_id : ts_id_candidates) { + const auto replica_num = FindOrDie(replica_num_by_ts_id, ts_id); + servers_by_replica_num.emplace(replica_num, ts_id); + } + + // Prefer most loaded tablet servers for the replica-to-remove candidates. + string replica_id; + for (const auto& elem : servers_by_replica_num) { + replica_id = elem.second; + // Prefer non-leader replicas for the replica-to-remove candidates: removing + // a leader replica might require currently active clients to reconnect. + if (replica_id != ts_id_leader_replica) { + break; + } + } + CHECK(!replica_id.empty()); + + *replica_to_replace = { tablet_id, replica_id, "", tablet_info.config_idx }; + return Status::OK(); +} + +} // anonymous namespace + + +Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info, + TabletsPlacementInfo* info) { + DCHECK(info); + + unordered_map<string, TableInfo> tables_info; + for (const auto& table_summary : raw_info.table_summaries) { + const auto& table_id = table_summary.id; + TableInfo table_info{ table_summary.name, table_summary.replication_factor }; + EmplaceOrDie(&tables_info, table_id, std::move(table_info)); + } + + // Build utility map: tablet server identifier to location (it's used below). + unordered_map<string, string> location_by_ts_id; + for (const auto& summary : raw_info.tserver_summaries) { + const auto& ts_id = summary.uuid; + const auto& ts_location = summary.ts_location; + VLOG(1) << "found tserver " << ts_id + << " at location '" << ts_location << "'"; + EmplaceOrDie(&location_by_ts_id, ts_id, ts_location); + } + + decltype(TabletsPlacementInfo::replica_num_by_ts_id) replica_num_by_ts_id; + decltype(TabletsPlacementInfo::tablet_to_table_id) tablet_to_table_id; + decltype(TabletsPlacementInfo::tablets_info) tablets_info; + decltype(TabletsPlacementInfo::tablet_location_info) tablet_location_info; + for (const auto& tablet_summary : raw_info.tablet_summaries) { + const auto& tablet_id = tablet_summary.id; + if (tablet_summary.result != KsckCheckResult::HEALTHY) { + // TODO(aserbin): should this be reported as some transient condition + // to be taken into account? E.g., a tablet might be + // in process of copying data to a new replica to replace + // another replica which violates the placement policy. + VLOG(1) << Substitute("tablet $0: not considering replicas for movement " + "since the tablet's status is '$1'", + tablet_id, + KsckCheckResultToString(tablet_summary.result)); + continue; + } + EmplaceOrDie(&tablet_to_table_id, tablet_id, tablet_summary.table_id); + + TabletInfo tablet_info; + for (const auto& replica_info : tablet_summary.replicas) { + TabletReplicaInfo info; + info.ts_uuid = replica_info.ts_uuid; + if (replica_info.is_leader) { + info.role = ReplicaRole::LEADER; + } else { + info.role = replica_info.is_voter ? ReplicaRole::FOLLOWER_VOTER + : ReplicaRole::FOLLOWER_NONVOTER; + } + if (replica_info.is_leader && replica_info.consensus_state) { + const auto& cstate = *replica_info.consensus_state; + if (cstate.opid_index) { + tablet_info.config_idx = *cstate.opid_index; + } + } + ++LookupOrEmplace(&replica_num_by_ts_id, replica_info.ts_uuid, 0); + + // Populate ClusterLocationInfo::tablet_location_info. + auto& count_by_location = LookupOrEmplace(&tablet_location_info, + tablet_id, + unordered_map<string, int>()); + const auto& location = FindOrDie(location_by_ts_id, info.ts_uuid); + ++LookupOrEmplace(&count_by_location, location, 0); + tablet_info.replicas_info.emplace_back(std::move(info)); + } + EmplaceOrDie(&tablets_info, tablet_id, std::move(tablet_info)); + } + + TabletsPlacementInfo result_info; + result_info.tablets_info = std::move(tablets_info); + result_info.tables_info = std::move(tables_info); + result_info.tablet_to_table_id = std::move(tablet_to_table_id); + result_info.tablet_location_info = std::move(tablet_location_info); + result_info.replica_num_by_ts_id = std::move(replica_num_by_ts_id); + *info = std::move(result_info); + + return Status::OK(); +} + +// Search for violations of placement policy given the information on tablet +// replica distribution in the cluster. +Status DetectPlacementPolicyViolations( + const TabletsPlacementInfo& placement_info, + vector<PlacementPolicyViolationInfo>* result_info) { + DCHECK(result_info); + + // Information on tablets whose replicas violate placement policies. + vector<PlacementPolicyViolationInfo> info; + + for (const auto& tablet_loc_elem : placement_info.tablet_location_info) { + const auto& tablet_id = tablet_loc_elem.first; + const auto& tablet_loc_info = tablet_loc_elem.second; + + const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, tablet_id); + const auto& table_info = FindOrDie(placement_info.tables_info, table_id); + const auto rep_factor = table_info.replication_factor; + + // Maximum number of replicas, per location. + int max_replicas_num = 0; + string max_replicas_location; + for (const auto& elem : tablet_loc_info) { + const auto& location = elem.first; + const auto replica_num = elem.second; + if (max_replicas_num < replica_num) { + max_replicas_num = replica_num; + max_replicas_location = location; + } + } + + CHECK_GE(rep_factor, 1); + DCHECK_GE(max_replicas_num, 1); + DCHECK(!max_replicas_location.empty()); + const auto majority_size = consensus::MajoritySize(rep_factor); + + // The idea behind the placement policies is to keep the majority + // of replicas alive if any single location fails. + bool is_policy_violated = false; + if (rep_factor % 2 == 0) { + // In case of RF=2*N, losing at least N replicas means losing + // the majority. + if (rep_factor / 2 <= max_replicas_num) { + is_policy_violated = true; + LOG(INFO) << Substitute( + "tablet $0: detected $1 of $2 replicas at location $3", + tablet_id, max_replicas_num, rep_factor, max_replicas_location); + } + } else if (rep_factor > 1 && majority_size <= max_replicas_num) { + // In case of RF=2*N+1, losing at least N+1 replicas means losing + // the majority. + is_policy_violated = true; + LOG(INFO) << Substitute( + "tablet $0: detected majority of replicas ($1 of $2) at location $3", + tablet_id, max_replicas_num, rep_factor, max_replicas_location); + } + if (is_policy_violated) { + info.push_back({ tablet_id, max_replicas_location }); + } + } + + *result_info = std::move(info); + + return Status::OK(); +} + +// Find moves to correct policy violations. It's about finding the replicas to +// be marked with the REPLACE attribute: the catalog manager will do the rest +// to move the replicas elsewhere in accordance with the placement policies. +Status FindMovesToReimposePlacementPolicy( + const TabletsPlacementInfo& placement_info, + const ClusterLocalityInfo& locality_info, + const vector<PlacementPolicyViolationInfo>& violations_info, + std::vector<Rebalancer::ReplicaMove>* replicas_to_remove) { + DCHECK(replicas_to_remove); + if (violations_info.empty()) { + replicas_to_remove->clear(); + return Status::OK(); + } + + vector<Rebalancer::ReplicaMove> best_moves; + for (const auto& info : violations_info) { + // There might be more than one move in total to fix a placement policy + // violation for a tablet, but no more than one replica per tablet is moved + // between locations at once. The process of correcting placement violations + // is iterative: the number of iterations per tablet is limited by its + // replication factor (precisely, the upper limit is RF/2). + Rebalancer::ReplicaMove move; + const auto s = FindBestReplicaToReplace( + info, locality_info, placement_info, &move); + if (s.IsConfigurationError()) { + // Best effort to fix violations in case of misconfigured clusters. + continue; + } + RETURN_NOT_OK(s); + best_moves.emplace_back(std::move(move)); + } + *replicas_to_remove = std::move(best_moves); + + return Status::OK(); +} + +} // namespace tools +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/placement_policy_util.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/placement_policy_util.h b/src/kudu/tools/placement_policy_util.h new file mode 100644 index 0000000..e54848d --- /dev/null +++ b/src/kudu/tools/placement_policy_util.h @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#include <string> +#include <unordered_map> +#include <vector> + +#include <boost/optional/optional.hpp> + +#include "kudu/tools/rebalancer.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace tools { + +// Below are the structures to describe Kudu entities such as tablet replicas, +// tablets, and tables from the perspective of the placement policy constraints. + +struct ClusterLocalityInfo; + +enum class ReplicaRole { + LEADER, + FOLLOWER_VOTER, + FOLLOWER_NONVOTER, +}; + +struct TabletReplicaInfo { + std::string ts_uuid; + ReplicaRole role; +}; + +struct TabletInfo { + std::vector<TabletReplicaInfo> replicas_info; + boost::optional<int64_t> config_idx; // For CAS-like change of Raft configs. +}; + +struct TableInfo { + std::string name; + int replication_factor; +}; + +// Information to describe tablet replica distribution on the cluster +// from the perspective of the placement policy constraints. +struct TabletsPlacementInfo { + // Tablet replica distribution information among locations: + // tablet_id --> { loc0: k, loc1: l, ..., locN: m } + std::unordered_map<std::string, std::unordered_map<std::string, int>> + tablet_location_info; + + // tablet_id --> tablet information + std::unordered_map<std::string, TabletInfo> tablets_info; + + // table_id --> table information + std::unordered_map<std::string, TableInfo> tables_info; + + // Dictionary: mapping tablet_id into its table_id. + std::unordered_map<std::string, std::string> tablet_to_table_id; + + // tserver_id --> total number of tablet replicas at the tserver. + std::unordered_map<std::string, int> replica_num_by_ts_id; +}; + +// Convert ClusterRawInfo into TabletsPlacementInfo. +Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info, + TabletsPlacementInfo* info); + +// Information on a violation of the basic placement policy constraint. +// The basic constraint is: for any tablet, no location should contain +// the majority of the replicas of the tablet. +struct PlacementPolicyViolationInfo { + std::string tablet_id; + std::string majority_location; + int replication_factor; + int replicas_num_at_majority_location; +}; + +// Given the information on replica placement in the cluster, detect violations +// of the placement policy constraints and output that information into the +// 'result_info' out parameter. +Status DetectPlacementPolicyViolations( + const TabletsPlacementInfo& placement_info, + std::vector<PlacementPolicyViolationInfo>* result_info); + +// Given the information on tablet replica distribution for a cluster and +// list of placement policy violations, find "best candidate" replicas to move, +// reinstating the placement policy for corresponding tablets. The function +// outputs only one (and the best) candidate replica per reported violation, +// even if multiple replicas have to be moved to bring replica distribution +// in compliance with the policy. It's assumed that the process of reimposing +// the placement policy for the cluster is iterative, so it's necessary to +// call DetectPlacementPolicyViolations() followed by +// FindMovesToReimposePlacementPolicy() multiple times, until no policy +// violations are reported or no candidates are found. +Status FindMovesToReimposePlacementPolicy( + const TabletsPlacementInfo& placement_info, + const ClusterLocalityInfo& locality_info, + const std::vector<PlacementPolicyViolationInfo>& violations_info, + std::vector<Rebalancer::ReplicaMove>* replicas_to_remove); + +} // namespace tools +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/rebalance_algo.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/tools/rebalance_algo.h index 1db0d07..e099051 100644 --- a/src/kudu/tools/rebalance_algo.h +++ b/src/kudu/tools/rebalance_algo.h @@ -19,7 +19,9 @@ #include <cstdint> #include <map> #include <random> +#include <set> #include <string> +#include <unordered_map> #include <vector> #include <gtest/gtest_prod.h> @@ -33,8 +35,9 @@ template <class T> class optional; namespace kudu { namespace tools { -// A map from a count of replicas to a server id. The "reversed" relationship -// facilitates finding the servers with the maximum and minimum counts. +// A map from a count of replicas to a server identifier. The "reversed" +// relationship facilitates finding the servers with the maximum and minimum +// replica counts. typedef std::multimap<int32_t, std::string> ServersByCountMap; // Balance information for a table. @@ -59,13 +62,22 @@ struct ClusterBalanceInfo { // skew between them decreases, or when the skew decreases. std::multimap<int32_t, TableBalanceInfo> table_info_by_skew; - // Mapping total replica count -> tablet server. + // Mapping total replica count -> tablet server identifier. // // The total replica count of a tablet server is defined as the total number // of replicas hosted on the tablet server. ServersByCountMap servers_by_total_replica_count; }; +struct ClusterLocalityInfo { + // Location-related information: distribution of tablet servers by locations. + // Mapping 'location' --> 'identifiers of tablet servers in the location'. + std::unordered_map<std::string, std::set<std::string>> servers_by_location; + + // Mapping 'tablet server identifier' --> 'location'. + std::unordered_map<std::string, std::string> location_by_ts_id; +}; + // A directive to move some replica of a table between two tablet servers. struct TableReplicaMove { std::string table_id; http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/rebalancer.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h index 3f7f0b1..a6e4caf 100644 --- a/src/kudu/tools/rebalancer.h +++ b/src/kudu/tools/rebalancer.h @@ -28,14 +28,20 @@ #include <vector> #include <boost/optional/optional.hpp> -#include <gtest/gtest_prod.h> #include "kudu/client/shared_ptr.h" +#include "kudu/tools/ksck_results.h" #include "kudu/tools/rebalance_algo.h" #include "kudu/util/monotime.h" #include "kudu/util/status.h" namespace kudu { +namespace tools { +struct TabletsPlacementInfo; +} // namespace tools +} // namespace kudu + +namespace kudu { namespace client { class KuduClient; @@ -46,6 +52,13 @@ namespace tools { class Ksck; struct KsckResults; +// Sub-set of fields from KsckResult which are relevant to the rebalancing. +struct ClusterRawInfo { + std::vector<KsckServerHealthSummary> tserver_summaries; + std::vector<KsckTableSummary> table_summaries; + std::vector<KsckTabletSummary> tablet_summaries; +}; + // A class implementing logic for Kudu cluster rebalancing. class Rebalancer { public: @@ -96,6 +109,7 @@ class Rebalancer { std::string tablet_uuid; std::string ts_uuid_from; std::string ts_uuid_to; + boost::optional<int64_t> config_opid_idx; // for CAS-enabled Raft changes }; enum class RunStatus { @@ -288,7 +302,6 @@ class Rebalancer { // Auxiliary Ksck object to get information on the cluster. std::shared_ptr<Ksck> ksck_; - }; } // namespace tools