Repository: kudu
Updated Branches:
  refs/heads/master 2c5c9d06b -> 34bb7f93b


[rebalancer] location-aware rebalancer (part 1/n)

Added logic to identify and fix placement policy violations.
Also, added units tests to cover the new functionality.

Change-Id: I1cdc3b57a782e6d043b1853600c97fe1f8630347
Reviewed-on: http://gerrit.cloudera.org:8080/11549
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wdberke...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/81d0109b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/81d0109b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/81d0109b

Branch: refs/heads/master
Commit: 81d0109b461fc29c427c1fc81598c286318a95b9
Parents: 2c5c9d0
Author: Alexey Serbin <aser...@cloudera.com>
Authored: Fri Sep 21 15:38:40 2018 -0700
Committer: Alexey Serbin <aser...@cloudera.com>
Committed: Mon Oct 29 23:05:24 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/CMakeLists.txt                |   2 +
 src/kudu/tools/ksck_results.h                |   1 +
 src/kudu/tools/placement_policy_util-test.cc | 494 ++++++++++++++++++++++
 src/kudu/tools/placement_policy_util.cc      | 357 ++++++++++++++++
 src/kudu/tools/placement_policy_util.h       | 117 +++++
 src/kudu/tools/rebalance_algo.h              |  18 +-
 src/kudu/tools/rebalancer.h                  |  17 +-
 7 files changed, 1001 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index c13fa65..6e3ad21 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -89,6 +89,7 @@ target_link_libraries(ksck
 add_library(kudu_tools_rebalance
   rebalancer.cc
   rebalance_algo.cc
+  placement_policy_util.cc
   tool_replica_util.cc
 )
 target_link_libraries(kudu_tools_rebalance
@@ -179,6 +180,7 @@ ADD_KUDU_TEST_DEPENDENCIES(kudu-tool-test
 ADD_KUDU_TEST(kudu-ts-cli-test)
 ADD_KUDU_TEST_DEPENDENCIES(kudu-ts-cli-test
   kudu)
+ADD_KUDU_TEST(placement_policy_util-test)
 ADD_KUDU_TEST(rebalance-test)
 ADD_KUDU_TEST(rebalance_algo-test)
 ADD_KUDU_TEST(rebalancer_tool-test

http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/ksck_results.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_results.h b/src/kudu/tools/ksck_results.h
index b9cad6c..21a3112 100644
--- a/src/kudu/tools/ksck_results.h
+++ b/src/kudu/tools/ksck_results.h
@@ -142,6 +142,7 @@ int ServerHealthScore(KsckServerHealth sh);
 struct KsckServerHealthSummary {
   std::string uuid;
   std::string address;
+  std::string ts_location;
   boost::optional<std::string> version;
   KsckServerHealth health = KsckServerHealth::HEALTHY;
   Status status = Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/placement_policy_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util-test.cc 
b/src/kudu/tools/placement_policy_util-test.cc
new file mode 100644
index 0000000..68266f3
--- /dev/null
+++ b/src/kudu/tools/placement_policy_util-test.cc
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/placement_policy_util.h"
+
+#include <cstdint>
+#include <iostream>
+#include <map>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/rebalance_algo.h"
+#include "kudu/tools/rebalancer.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::set;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tools {
+
+// TODO(aserbin): consider renaming the structures below XxxInfo --> Xxx
+
+// Information on a table.
+struct TestTableInfo {
+  const string id;
+  const int replication_factor;
+  const vector<string> tablet_ids;
+};
+
+// Information on tablet servers in the same location.
+struct TabletServerLocationInfo {
+  const string location;
+  const vector<string> ts_ids;
+};
+
+// Information on tablet replicas hosted by a tablet server.
+struct TabletServerReplicasInfo {
+  const string ts_id;
+  const vector<string> tablet_ids;
+};
+
+// Describes a cluster with emphasis on the placement policy constraints.
+struct TestClusterConfig {
+  // The input information on the test cluster.
+  const vector<TestTableInfo> tables_info;
+  const vector<TabletServerLocationInfo> ts_location_info;
+  const vector<TabletServerReplicasInfo> replicas_info;
+
+  // The expected information on placement policy violations and moves
+  // to correct those.
+  const vector<PlacementPolicyViolationInfo> reference_violations_info;
+  const vector<Rebalancer::ReplicaMove> reference_replicas_to_remove;
+};
+
+// Transform the definition of the test cluster into the ClusterLocalityInfo
+// and TabletsPlacementInfo.
+void ClusterConfigToClusterPlacementInfo(const TestClusterConfig& tcc,
+                                         ClusterLocalityInfo* cli,
+                                         TabletsPlacementInfo* tpi) {
+  // TODO(aserbin): add sanity checks on the results
+  ClusterLocalityInfo result_cli;
+  TabletsPlacementInfo result_tpi;
+
+  for (const auto& table_info : tcc.tables_info) {
+    TableInfo info;
+    info.name = table_info.id;
+    info.replication_factor = table_info.replication_factor;
+    EmplaceOrDie(&result_tpi.tables_info, table_info.id, std::move(info));
+    for (const auto& tablet_id : table_info.tablet_ids) {
+      EmplaceOrDie(&result_tpi.tablet_to_table_id, tablet_id, table_info.id);
+    }
+  }
+
+  for (const auto& location_info : tcc.ts_location_info) {
+    const auto& location = location_info.location;
+    // Populate ts_uuids_by_location.
+    auto& ts_uuids = LookupOrEmplace(&result_cli.servers_by_location,
+                                     location, set<string>());
+    ts_uuids.insert(location_info.ts_ids.begin(), location_info.ts_ids.end());
+    for (const auto& ts_id : ts_uuids) {
+      EmplaceOrDie(&result_cli.location_by_ts_id, ts_id, location);
+    }
+  }
+
+  for (const auto& replica_info : tcc.replicas_info) {
+    const auto& ts_location = FindOrDie(result_cli.location_by_ts_id,
+                                        replica_info.ts_id);
+    // Populate replica_num_by_ts_id.
+    EmplaceOrDie(&result_tpi.replica_num_by_ts_id,
+                 replica_info.ts_id, replica_info.tablet_ids.size());
+    for (const auto& tablet_id : replica_info.tablet_ids) {
+      // Populate tablets_info.
+      auto& tablet_info = LookupOrEmplace(&result_tpi.tablets_info,
+                                          tablet_id, TabletInfo());
+      tablet_info.config_idx = 0; // hard-coded for this type of test
+      auto& ri = tablet_info.replicas_info;
+      // For these tests, the first tablet in the list is assigned leader role.
+      ri.push_back(TabletReplicaInfo{
+          replica_info.ts_id,
+          ri.empty() ? ReplicaRole::LEADER : ReplicaRole::FOLLOWER_VOTER });
+      // Populate tablet_location_info.
+      auto& count_by_location = LookupOrEmplace(
+          &result_tpi.tablet_location_info, tablet_id, unordered_map<string, 
int>());
+      ++LookupOrEmplace(&count_by_location, ts_location, 0);
+    }
+  }
+  *cli = std::move(result_cli);
+  *tpi = std::move(result_tpi);
+}
+
+// TODO(aserbin): is it needed at all?
+bool operator==(const PlacementPolicyViolationInfo& lhs,
+                const PlacementPolicyViolationInfo& rhs) {
+  return lhs.tablet_id == rhs.tablet_id &&
+      lhs.majority_location == rhs.majority_location &&
+      lhs.replicas_num_at_majority_location ==
+          rhs.replicas_num_at_majority_location &&
+      lhs.replication_factor == rhs.replication_factor;
+}
+
+ostream& operator<<(ostream& s, const PlacementPolicyViolationInfo& info) {
+  s << "{tablet_id: " << info.tablet_id
+    << ", location: " << info.majority_location << "}";
+  return s;
+}
+
+bool operator==(const Rebalancer::ReplicaMove& lhs,
+                const Rebalancer::ReplicaMove& rhs) {
+  CHECK(lhs.ts_uuid_to.empty());
+  CHECK(rhs.ts_uuid_to.empty());
+  // The config_opid_idx field is ingored in tests for brevity.
+  return lhs.tablet_uuid == rhs.tablet_uuid &&
+      lhs.ts_uuid_from == rhs.ts_uuid_from;
+}
+
+ostream& operator<<(ostream& s, const Rebalancer::ReplicaMove& info) {
+  CHECK(info.ts_uuid_to.empty());
+  s << "{tablet_id: " << info.tablet_uuid
+    << ", ts_id: " << info.ts_uuid_from << "}";
+  return s;
+}
+
+// The order of elements in the container of reported violations
+// and the container of replica movements to fix the latter doesn't
+// matter: they are independent by definition (because they are reported
+// in per-tablet way) and either container must not have multiple entries
+// per tablet anyway. For the ease of comparison with reference results,
+// let's build ordered map out of those, where the key is tablet id
+// and the value is the information on the violation or candidate replica
+// movement.
+typedef map<string, PlacementPolicyViolationInfo> PPVIMap;
+typedef map<string, Rebalancer::ReplicaMove> ReplicaMovesMap;
+
+void ViolationInfoVectorToMap(
+    const vector<PlacementPolicyViolationInfo>& infos,
+    PPVIMap* result) {
+  PPVIMap ret;
+  for (const auto& info : infos) {
+    ASSERT_TRUE(EmplaceIfNotPresent(&ret, info.tablet_id, info));
+  }
+  *result = std::move(ret);
+}
+
+void ReplicaMoveVectorToMap(
+    const vector<Rebalancer::ReplicaMove>& infos,
+    ReplicaMovesMap* result) {
+  ReplicaMovesMap ret;
+  for (const auto& info : infos) {
+    ASSERT_TRUE(EmplaceIfNotPresent(&ret, info.tablet_uuid, info));
+  }
+  *result = std::move(ret);
+}
+
+void CheckEqual(const vector<PlacementPolicyViolationInfo>& lhs,
+                const vector<PlacementPolicyViolationInfo>& rhs) {
+  PPVIMap lhs_map;
+  NO_FATALS(ViolationInfoVectorToMap(lhs, &lhs_map));
+  PPVIMap rhs_map;
+  NO_FATALS(ViolationInfoVectorToMap(rhs, &rhs_map));
+  ASSERT_EQ(lhs_map, rhs_map);
+}
+
+void CheckEqual(const vector<Rebalancer::ReplicaMove>& lhs,
+                const vector<Rebalancer::ReplicaMove>& rhs) {
+  ReplicaMovesMap lhs_map;
+  NO_FATALS(ReplicaMoveVectorToMap(lhs, &lhs_map));
+  ReplicaMovesMap rhs_map;
+  NO_FATALS(ReplicaMoveVectorToMap(rhs, &rhs_map));
+  ASSERT_EQ(lhs_map, rhs_map);
+}
+
+// A shortcut for DetectPlacementPolicyViolations() followed by
+// FindMovesToReimposePlacementPolicy().
+Status FindMovesToFixPolicyViolations(
+    const TabletsPlacementInfo& placement_info,
+    const ClusterLocalityInfo& locality_info,
+    vector<PlacementPolicyViolationInfo>* violations_info,
+    std::vector<Rebalancer::ReplicaMove>* replicas_to_remove) {
+  DCHECK(replicas_to_remove);
+
+  vector<PlacementPolicyViolationInfo> violations;
+  RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &violations));
+
+  if (violations.empty()) {
+    // Nothing to do: no placement policy violations found.
+    if (violations_info) {
+      violations_info->clear();
+    }
+    replicas_to_remove->clear();
+    return Status::OK();
+  }
+  RETURN_NOT_OK(FindMovesToReimposePlacementPolicy(
+      placement_info, locality_info, violations, replicas_to_remove));
+  *violations_info = std::move(violations);
+
+  return Status::OK();
+}
+
+class ClusterLocationTest : public ::testing::Test {
+ protected:
+  void RunTest(const vector<TestClusterConfig>& test_configs) {
+    for (auto idx = 0; idx < test_configs.size(); ++idx) {
+      SCOPED_TRACE(Substitute("test config index: $0", idx));
+      const auto& cfg = test_configs[idx];
+
+      ClusterLocalityInfo cli;
+      TabletsPlacementInfo tpi;
+      ClusterConfigToClusterPlacementInfo(cfg, &cli, &tpi);
+
+      vector<PlacementPolicyViolationInfo> violations;
+      vector<Rebalancer::ReplicaMove> moves;
+      ASSERT_OK(FindMovesToFixPolicyViolations(tpi, cli, &violations, &moves));
+
+      NO_FATALS(CheckEqual(cfg.reference_violations_info, violations));
+      NO_FATALS(CheckEqual(cfg.reference_replicas_to_remove, moves));
+    }
+  }
+};
+
+TEST_F(ClusterLocationTest, PlacementPolicyViolationsNone) {
+  const vector<TestClusterConfig> configs = {
+    // Single-replica tablets, all in one location: no violation to report.
+    {
+      {
+        { "T0", 1, { "t0", "t1", "t2", } },
+      },
+      {
+        { "L0", { "A", } },
+        { "L1", { "B", } },
+        { "L2", { "C", } },
+      },
+      {
+        { "A", { "t0", "t1", "t2", } },
+        { "B", {} },
+        { "C", {} },
+      },
+    },
+
+    // One RF=3 tablet, one replica per location: no violations to report.
+    {
+      {
+        { "T0", 3, { "t0", } },
+      },
+      {
+        { "L0", { "A", } },
+        { "L1", { "B", } },
+        { "L2", { "C", } },
+      },
+      {
+        { "A", { "t0", } },
+        { "B", { "t0", } },
+        { "C", { "t0", } },
+      },
+    },
+  };
+  NO_FATALS(RunTest(configs));
+}
+
+TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
+  const vector<TestClusterConfig> configs = {
+    // One RF=3 table with one tablet, all the replicas in one of the three
+    // locations. In addition, one RF=1 table with one tablet has its single
+    // replica in the same location.
+    {
+      {
+        { "T0", 3, { "t0", } },
+        { "X0", 1, { "x0", } },
+      },
+      {
+        { "L0", { "A", "B", "C", } },
+        { "L1", { "D", } },
+        { "L2", { "E", } },
+      },
+      {
+        { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", "x0", } },
+        { "D", {} },
+        { "E", {} },
+      },
+      { { "t0", "L0" }, },
+      { { "t0", "C" }, }
+    },
+
+    // One RF=3 tablet, two locations.
+    {
+      {
+        { "T0", 3, { "t0", } },
+      },
+      {
+        { "L0", { "A", "B", } },
+        { "L1", { "C", } },
+      },
+      {
+        { "A", { "t0", } },
+        { "B", { "t0", } },
+        { "C", { "t0", } },
+      },
+      { { "t0", "L0" }, },
+      {},
+    },
+
+    // One RF=3 tablet, majority of replicas in one of three locations.
+    {
+      {
+        { "T0", 3, { "t0", } },
+      },
+      {
+        { "L0", { "A", "B", } },
+        { "L1", { "C", } },
+        { "L2", { "D", } },
+      },
+      {
+        { "A", { "t0", } }, { "B", { "t0", } },
+        { "C", { "t0", } },
+        { "D", {} },
+      },
+      { { "t0", "L0" }, },
+      { { "t0", "B" }, }
+    },
+  };
+  NO_FATALS(RunTest(configs));
+}
+
+TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
+  const vector<TestClusterConfig> configs = {
+    // Two tables: one of RF=3 and another of RF=1. For both two tablets of the
+    // former, the replica distribution violates the placement policy.
+    {
+      {
+        { "T0", 3, { "t0", "t1", } },
+        { "X0", 1, { "x0", "x1", } },
+      },
+      {
+        { "L0", { "A", "B", "C", } },
+        { "L1", { "D", "E", } },
+        { "L2", { "F", } },
+      },
+      {
+        { "A", { "t0", } }, { "B", { "t0", "x0", } }, { "C", { "t0", } },
+        { "D", { "t1", "x1", } }, { "E", { "t1", } },
+        { "F", { "t1", } },
+      },
+      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "B" }, { "t1", "E" } }
+    },
+
+    // Four RF=3 tablets: the replica placement of two tablets is OK,
+    // but the placement of the two others' violates the placement policy.
+    {
+      {
+        { "T0", 3, { "t0", "t1", "t2", "t3", } },
+      },
+      {
+        { "L0", { "A", "B", "C", } },
+        { "L1", { "D", "E", } },
+        { "L2", { "F", } },
+      },
+      {
+        { "A", { "t0", "t2", } }, { "B", { "t0", "t3", } }, { "C", { "t0", } },
+        { "D", { "t1", "t2", } }, { "E", { "t1", "t3", } },
+        { "F", { "t1", "t2", "t3", } },
+      },
+      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "B" }, { "t1", "E" } }
+    },
+  };
+  NO_FATALS(RunTest(configs));
+}
+
+// That's a scenario to verify how FixPlacementPolicyViolations() works when
+// there isn't a candidate replica to move. In general (i.e. in case of RF
+// higher than 3), the existence of more than two locations does not guarantee
+// there is always a way to distribute tablet replicas across locations
+// so that no location has the majority of replicas.
+TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
+  const vector<TestClusterConfig> configs = {
+    // One RF=5 tablet with the distribution of its replica placement violating
+    // the placement policy.
+    {
+      {
+        { "T0", 5, { "t0", } },
+      },
+      {
+        { "L0", { "A", "B", "C", "D", } },
+        { "L1", { "E", } },
+        { "L2", { "F", } },
+      },
+      {
+        // Tablet server D doesn't host any replica of t0.
+        { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", } },
+        { "E", { "t0", } },
+        { "F", { "t0", } },
+      },
+      { { "t0", "L0" }, },
+      {},
+    },
+    // One RF=7 tablet with the distribution of its replica placement violating
+    // the placement policy.
+    {
+      {
+        { "T0", 7, { "t0", } },
+      },
+      {
+        { "L0", { "A", "B", "C", "D", "E", } },
+        { "L1", { "F", } },
+        { "L2", { "G", } },
+        { "L3", { "H", } },
+      },
+      {
+        // Tablet server E doesn't host any replica of t0. The idea is to
+        // verify that FindMovesToReimposePlacementPolicy() does not command
+        // moving a replica within the same location to E or from F, G, or H
+        // to F.
+        { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", } },
+            { "D", { "t0", } },
+        { "F", { "t0", } },
+        { "G", { "t0", } },
+        { "H", { "t0", } },
+      },
+      { { "t0", "L0" }, },
+      {},
+    },
+  };
+  for (auto idx = 0; idx < configs.size(); ++idx) {
+    SCOPED_TRACE(Substitute("test config index: $0", idx));
+    const auto& cfg = configs[idx];
+
+    ClusterLocalityInfo cli;
+    TabletsPlacementInfo tpi;
+    ClusterConfigToClusterPlacementInfo(cfg, &cli, &tpi);
+
+    vector<PlacementPolicyViolationInfo> violations;
+    ASSERT_OK(DetectPlacementPolicyViolations(tpi, &violations));
+    NO_FATALS(CheckEqual(cfg.reference_violations_info, violations));
+
+    vector<Rebalancer::ReplicaMove> moves;
+    auto s = FindMovesToReimposePlacementPolicy(tpi, cli, violations, &moves);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(moves.empty());
+  }
+}
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/placement_policy_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.cc 
b/src/kudu/tools/placement_policy_util.cc
new file mode 100644
index 0000000..d6494eb
--- /dev/null
+++ b/src/kudu/tools/placement_policy_util.cc
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/placement_policy_util.h"
+
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <ostream>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/consensus/quorum_util.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/ksck_results.h"
+#include "kudu/tools/rebalance_algo.h"
+#include "kudu/util/status.h"
+
+using std::multimap;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tools {
+
+namespace {
+
+// Given the information on a placement policy violation for the specified
+// tablet, find the best replica to mark with the REPLACE attribute.
+Status FindBestReplicaToReplace(
+    const PlacementPolicyViolationInfo& info,
+    const ClusterLocalityInfo& locality_info,
+    const TabletsPlacementInfo& tablets_info,
+    Rebalancer::ReplicaMove* replica_to_replace) {
+  DCHECK(replica_to_replace);
+
+  const auto& ts_id_by_location = locality_info.servers_by_location;
+  // If the cluster has a single location, it's impossible to move any replica
+  // to other location.
+  if (ts_id_by_location.size() == 1) {
+    return Status::ConfigurationError(Substitute(
+        "cannot change tablet replica distribution to conform with "
+        "the placement policy constraints: cluster is configured to have "
+        "only one location '$0'", info.majority_location));
+  }
+
+  const auto& tablet_id = info.tablet_id;
+
+  // If a total number of locations is 2, it's impossible to make its replica
+  // distribution conform with the placement policy constraints.
+  const auto& table_id = FindOrDie(tablets_info.tablet_to_table_id, tablet_id);
+  const auto& table_info = FindOrDie(tablets_info.tables_info, table_id);
+  if (ts_id_by_location.size() == 2 && table_info.replication_factor % 2 == 1) 
{
+    return Status::ConfigurationError(Substitute(
+        "tablet $0 (table name '$1'): replica distribution cannot conform "
+        "with the placement policy constraints since its replication "
+        "factor is odd ($2) and there are two locations in the cluster",
+        tablet_id, table_info.name, table_info.replication_factor));
+  }
+
+  const auto& location = info.majority_location;
+  const auto& per_location_replica_info = FindOrDie(
+      tablets_info.tablet_location_info, tablet_id);
+
+  bool have_location_to_move_replica = false;
+  if (per_location_replica_info.size() <
+      locality_info.servers_by_location.size()) {
+    // Have at least one extra location without replicas of the tablet.
+    have_location_to_move_replica = true;
+  } else {
+    // Check if there is a location to host an extra replica of the tablet.
+    for (const auto& elem : per_location_replica_info) {
+      const auto& loc = elem.first;
+      const auto loc_replica_count = elem.second;
+      // Check whether the location can accomodate one more replica while not
+      // becoming 'the majority' location.
+      if (loc_replica_count + 1 >=
+          consensus::MajoritySize(table_info.replication_factor)) {
+        continue;
+      }
+      // The source location is among would-be-majority ones because it hosts
+      // the majority of replicas already.
+      CHECK_NE(location, loc) << Substitute("location '$0' must have "
+                                            "the majority of replicas", loc);
+      const auto& servers = FindOrDie(locality_info.servers_by_location, loc);
+      if (servers.size() > loc_replica_count) {
+        // There is an extra tablet server to host another replica in this
+        // location.
+        have_location_to_move_replica = true;
+        break;
+      }
+    }
+  }
+  if (!have_location_to_move_replica) {
+    return Status::NotFound(Substitute(
+        "there isn't a single candidate location to move replica of tablet $0 "
+        "from the majority location '$1'", tablet_id, info.majority_location));
+  }
+
+  // Identifiers of the tablet servers that host the candidate replicas to be
+  // kicked out.
+  vector<string> ts_id_candidates;
+
+  // The identifier of the leader replica, if it's in the source location.
+  string ts_id_leader_replica;
+
+  const auto& tablet_info = FindOrDie(tablets_info.tablets_info, tablet_id);
+  const auto& ts_at_location = FindOrDie(ts_id_by_location, location);
+  for (const auto& replica_info : tablet_info.replicas_info) {
+    const auto& ts_id = replica_info.ts_uuid;
+    if (!ContainsKey(ts_at_location, ts_id)) {
+      continue;
+    }
+    ts_id_candidates.emplace_back(ts_id);
+    if (replica_info.role == ReplicaRole::LEADER) {
+      DCHECK(ts_id_leader_replica.empty());
+      ts_id_leader_replica = ts_id;
+    }
+  }
+
+  CHECK(!ts_id_candidates.empty()) << Substitute(
+      "no replicas found to remove from location '$0' to fix placement policy "
+      "violation for tablet $1", location, tablet_id);
+
+  // Build auxiliary map to find most loaded tablet servers.
+  const auto& replica_num_by_ts_id = tablets_info.replica_num_by_ts_id;
+  multimap<int32_t, string, std::greater<int32_t>> servers_by_replica_num;
+  for (const auto& ts_id : ts_id_candidates) {
+    const auto replica_num = FindOrDie(replica_num_by_ts_id, ts_id);
+    servers_by_replica_num.emplace(replica_num, ts_id);
+  }
+
+  // Prefer most loaded tablet servers for the replica-to-remove candidates.
+  string replica_id;
+  for (const auto& elem : servers_by_replica_num) {
+    replica_id = elem.second;
+    // Prefer non-leader replicas for the replica-to-remove candidates: 
removing
+    // a leader replica might require currently active clients to reconnect.
+    if (replica_id != ts_id_leader_replica) {
+      break;
+    }
+  }
+  CHECK(!replica_id.empty());
+
+  *replica_to_replace = { tablet_id, replica_id, "", tablet_info.config_idx };
+  return Status::OK();
+}
+
+} // anonymous namespace
+
+
+Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
+                                 TabletsPlacementInfo* info) {
+  DCHECK(info);
+
+  unordered_map<string, TableInfo> tables_info;
+  for (const auto& table_summary : raw_info.table_summaries) {
+    const auto& table_id = table_summary.id;
+    TableInfo table_info{ table_summary.name, table_summary.replication_factor 
};
+    EmplaceOrDie(&tables_info, table_id, std::move(table_info));
+  }
+
+  // Build utility map: tablet server identifier to location (it's used below).
+  unordered_map<string, string> location_by_ts_id;
+  for (const auto& summary : raw_info.tserver_summaries) {
+    const auto& ts_id = summary.uuid;
+    const auto& ts_location = summary.ts_location;
+    VLOG(1) << "found tserver " << ts_id
+            << " at location '" << ts_location << "'";
+    EmplaceOrDie(&location_by_ts_id, ts_id, ts_location);
+  }
+
+  decltype(TabletsPlacementInfo::replica_num_by_ts_id) replica_num_by_ts_id;
+  decltype(TabletsPlacementInfo::tablet_to_table_id) tablet_to_table_id;
+  decltype(TabletsPlacementInfo::tablets_info) tablets_info;
+  decltype(TabletsPlacementInfo::tablet_location_info) tablet_location_info;
+  for (const auto& tablet_summary : raw_info.tablet_summaries) {
+    const auto& tablet_id = tablet_summary.id;
+    if (tablet_summary.result != KsckCheckResult::HEALTHY) {
+      // TODO(aserbin): should this be reported as some transient condition
+      //                to be taken into account? E.g., a tablet might be
+      //                in process of copying data to a new replica to replace
+      //                another replica which violates the placement policy.
+      VLOG(1) << Substitute("tablet $0: not considering replicas for movement "
+                            "since the tablet's status is '$1'",
+                            tablet_id,
+                            KsckCheckResultToString(tablet_summary.result));
+      continue;
+    }
+    EmplaceOrDie(&tablet_to_table_id, tablet_id, tablet_summary.table_id);
+
+    TabletInfo tablet_info;
+    for (const auto& replica_info : tablet_summary.replicas) {
+      TabletReplicaInfo info;
+      info.ts_uuid = replica_info.ts_uuid;
+      if (replica_info.is_leader) {
+        info.role = ReplicaRole::LEADER;
+      } else {
+        info.role = replica_info.is_voter ? ReplicaRole::FOLLOWER_VOTER
+                                          : ReplicaRole::FOLLOWER_NONVOTER;
+      }
+      if (replica_info.is_leader && replica_info.consensus_state) {
+        const auto& cstate = *replica_info.consensus_state;
+        if (cstate.opid_index) {
+          tablet_info.config_idx = *cstate.opid_index;
+        }
+      }
+      ++LookupOrEmplace(&replica_num_by_ts_id, replica_info.ts_uuid, 0);
+
+      // Populate ClusterLocationInfo::tablet_location_info.
+      auto& count_by_location = LookupOrEmplace(&tablet_location_info,
+                                                tablet_id,
+                                                unordered_map<string, int>());
+      const auto& location = FindOrDie(location_by_ts_id, info.ts_uuid);
+      ++LookupOrEmplace(&count_by_location, location, 0);
+      tablet_info.replicas_info.emplace_back(std::move(info));
+    }
+    EmplaceOrDie(&tablets_info, tablet_id, std::move(tablet_info));
+  }
+
+  TabletsPlacementInfo result_info;
+  result_info.tablets_info = std::move(tablets_info);
+  result_info.tables_info = std::move(tables_info);
+  result_info.tablet_to_table_id = std::move(tablet_to_table_id);
+  result_info.tablet_location_info = std::move(tablet_location_info);
+  result_info.replica_num_by_ts_id = std::move(replica_num_by_ts_id);
+  *info = std::move(result_info);
+
+  return Status::OK();
+}
+
+// Search for violations of placement policy given the information on tablet
+// replica distribution in the cluster.
+Status DetectPlacementPolicyViolations(
+    const TabletsPlacementInfo& placement_info,
+    vector<PlacementPolicyViolationInfo>* result_info) {
+  DCHECK(result_info);
+
+  // Information on tablets whose replicas violate placement policies.
+  vector<PlacementPolicyViolationInfo> info;
+
+  for (const auto& tablet_loc_elem : placement_info.tablet_location_info) {
+    const auto& tablet_id = tablet_loc_elem.first;
+    const auto& tablet_loc_info = tablet_loc_elem.second;
+
+    const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, 
tablet_id);
+    const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
+    const auto rep_factor = table_info.replication_factor;
+
+    // Maximum number of replicas, per location.
+    int max_replicas_num = 0;
+    string max_replicas_location;
+    for (const auto& elem : tablet_loc_info) {
+      const auto& location = elem.first;
+      const auto replica_num = elem.second;
+      if (max_replicas_num < replica_num) {
+        max_replicas_num = replica_num;
+        max_replicas_location = location;
+      }
+    }
+
+    CHECK_GE(rep_factor, 1);
+    DCHECK_GE(max_replicas_num, 1);
+    DCHECK(!max_replicas_location.empty());
+    const auto majority_size = consensus::MajoritySize(rep_factor);
+
+    // The idea behind the placement policies is to keep the majority
+    // of replicas alive if any single location fails.
+    bool is_policy_violated = false;
+    if (rep_factor % 2 == 0) {
+      // In case of RF=2*N, losing at least N replicas means losing
+      // the majority.
+      if (rep_factor / 2 <= max_replicas_num) {
+        is_policy_violated = true;
+        LOG(INFO) << Substitute(
+            "tablet $0: detected $1 of $2 replicas at location $3",
+            tablet_id, max_replicas_num, rep_factor, max_replicas_location);
+      }
+    } else if (rep_factor > 1 && majority_size <= max_replicas_num) {
+      // In case of RF=2*N+1, losing at least N+1 replicas means losing
+      // the majority.
+      is_policy_violated = true;
+      LOG(INFO) << Substitute(
+          "tablet $0: detected majority of replicas ($1 of $2) at location $3",
+          tablet_id, max_replicas_num, rep_factor, max_replicas_location);
+    }
+    if (is_policy_violated) {
+      info.push_back({ tablet_id, max_replicas_location });
+    }
+  }
+
+  *result_info = std::move(info);
+
+  return Status::OK();
+}
+
+// Find moves to correct policy violations. It's about finding the replicas to
+// be marked with the REPLACE attribute: the catalog manager will do the rest
+// to move the replicas elsewhere in accordance with the placement policies.
+Status FindMovesToReimposePlacementPolicy(
+    const TabletsPlacementInfo& placement_info,
+    const ClusterLocalityInfo& locality_info,
+    const vector<PlacementPolicyViolationInfo>& violations_info,
+    std::vector<Rebalancer::ReplicaMove>* replicas_to_remove) {
+  DCHECK(replicas_to_remove);
+  if (violations_info.empty()) {
+    replicas_to_remove->clear();
+    return Status::OK();
+  }
+
+  vector<Rebalancer::ReplicaMove> best_moves;
+  for (const auto& info : violations_info) {
+    // There might be more than one move in total to fix a placement policy
+    // violation for a tablet, but no more than one replica per tablet is moved
+    // between locations at once. The process of correcting placement 
violations
+    // is iterative: the number of iterations per tablet is limited by its
+    // replication factor (precisely, the upper limit is RF/2).
+    Rebalancer::ReplicaMove move;
+    const auto s = FindBestReplicaToReplace(
+        info, locality_info, placement_info, &move);
+    if (s.IsConfigurationError()) {
+      // Best effort to fix violations in case of misconfigured clusters.
+      continue;
+    }
+    RETURN_NOT_OK(s);
+    best_moves.emplace_back(std::move(move));
+  }
+  *replicas_to_remove = std::move(best_moves);
+
+  return Status::OK();
+}
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/placement_policy_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.h 
b/src/kudu/tools/placement_policy_util.h
new file mode 100644
index 0000000..e54848d
--- /dev/null
+++ b/src/kudu/tools/placement_policy_util.h
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+
+#include "kudu/tools/rebalancer.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace tools {
+
+// Below are the structures to describe Kudu entities such as tablet replicas,
+// tablets, and tables from the perspective of the placement policy 
constraints.
+
+struct ClusterLocalityInfo;
+
+enum class ReplicaRole {
+  LEADER,
+  FOLLOWER_VOTER,
+  FOLLOWER_NONVOTER,
+};
+
+struct TabletReplicaInfo {
+  std::string ts_uuid;
+  ReplicaRole role;
+};
+
+struct TabletInfo {
+  std::vector<TabletReplicaInfo> replicas_info;
+  boost::optional<int64_t> config_idx;  // For CAS-like change of Raft configs.
+};
+
+struct TableInfo {
+  std::string name;
+  int replication_factor;
+};
+
+// Information to describe tablet replica distribution on the cluster
+// from the perspective of the placement policy constraints.
+struct TabletsPlacementInfo {
+  // Tablet replica distribution information among locations:
+  // tablet_id --> { loc0: k, loc1: l, ..., locN: m }
+  std::unordered_map<std::string, std::unordered_map<std::string, int>>
+      tablet_location_info;
+
+  // tablet_id --> tablet information
+  std::unordered_map<std::string, TabletInfo> tablets_info;
+
+  // table_id --> table information
+  std::unordered_map<std::string, TableInfo> tables_info;
+
+  // Dictionary: mapping tablet_id into its table_id.
+  std::unordered_map<std::string, std::string> tablet_to_table_id;
+
+  // tserver_id --> total number of tablet replicas at the tserver.
+  std::unordered_map<std::string, int> replica_num_by_ts_id;
+};
+
+// Convert ClusterRawInfo into TabletsPlacementInfo.
+Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
+                                 TabletsPlacementInfo* info);
+
+// Information on a violation of the basic placement policy constraint.
+// The basic constraint is: for any tablet, no location should contain
+// the majority of the replicas of the tablet.
+struct PlacementPolicyViolationInfo {
+  std::string tablet_id;
+  std::string majority_location;
+  int replication_factor;
+  int replicas_num_at_majority_location;
+};
+
+// Given the information on replica placement in the cluster, detect violations
+// of the placement policy constraints and output that information into the
+// 'result_info' out parameter.
+Status DetectPlacementPolicyViolations(
+    const TabletsPlacementInfo& placement_info,
+    std::vector<PlacementPolicyViolationInfo>* result_info);
+
+// Given the information on tablet replica distribution for a cluster and
+// list of placement policy violations, find "best candidate" replicas to move,
+// reinstating the placement policy for corresponding tablets. The function
+// outputs only one (and the best) candidate replica per reported violation,
+// even if multiple replicas have to be moved to bring replica distribution
+// in compliance with the policy. It's assumed that the process of reimposing
+// the placement policy for the cluster is iterative, so it's necessary to
+// call DetectPlacementPolicyViolations() followed by
+// FindMovesToReimposePlacementPolicy() multiple times, until no policy
+// violations are reported or no candidates are found.
+Status FindMovesToReimposePlacementPolicy(
+    const TabletsPlacementInfo& placement_info,
+    const ClusterLocalityInfo& locality_info,
+    const std::vector<PlacementPolicyViolationInfo>& violations_info,
+    std::vector<Rebalancer::ReplicaMove>* replicas_to_remove);
+
+} // namespace tools
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/rebalance_algo.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/tools/rebalance_algo.h
index 1db0d07..e099051 100644
--- a/src/kudu/tools/rebalance_algo.h
+++ b/src/kudu/tools/rebalance_algo.h
@@ -19,7 +19,9 @@
 #include <cstdint>
 #include <map>
 #include <random>
+#include <set>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include <gtest/gtest_prod.h>
@@ -33,8 +35,9 @@ template <class T> class optional;
 namespace kudu {
 namespace tools {
 
-// A map from a count of replicas to a server id. The "reversed" relationship
-// facilitates finding the servers with the maximum and minimum counts.
+// A map from a count of replicas to a server identifier. The "reversed"
+// relationship facilitates finding the servers with the maximum and minimum
+// replica counts.
 typedef std::multimap<int32_t, std::string> ServersByCountMap;
 
 // Balance information for a table.
@@ -59,13 +62,22 @@ struct ClusterBalanceInfo {
   // skew between them decreases, or when the skew decreases.
   std::multimap<int32_t, TableBalanceInfo> table_info_by_skew;
 
-  // Mapping total replica count -> tablet server.
+  // Mapping total replica count -> tablet server identifier.
   //
   // The total replica count of a tablet server is defined as the total number
   // of replicas hosted on the tablet server.
   ServersByCountMap servers_by_total_replica_count;
 };
 
+struct ClusterLocalityInfo {
+  // Location-related information: distribution of tablet servers by locations.
+  // Mapping 'location' --> 'identifiers of tablet servers in the location'.
+  std::unordered_map<std::string, std::set<std::string>> servers_by_location;
+
+  // Mapping 'tablet server identifier' --> 'location'.
+  std::unordered_map<std::string, std::string> location_by_ts_id;
+};
+
 // A directive to move some replica of a table between two tablet servers.
 struct TableReplicaMove {
   std::string table_id;

http://git-wip-us.apache.org/repos/asf/kudu/blob/81d0109b/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index 3f7f0b1..a6e4caf 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -28,14 +28,20 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <gtest/gtest_prod.h>
 
 #include "kudu/client/shared_ptr.h"
+#include "kudu/tools/ksck_results.h"
 #include "kudu/tools/rebalance_algo.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
+namespace tools {
+struct TabletsPlacementInfo;
+}  // namespace tools
+}  // namespace kudu
+
+namespace kudu {
 
 namespace client {
 class KuduClient;
@@ -46,6 +52,13 @@ namespace tools {
 class Ksck;
 struct KsckResults;
 
+// Sub-set of fields from KsckResult which are relevant to the rebalancing.
+struct ClusterRawInfo {
+  std::vector<KsckServerHealthSummary> tserver_summaries;
+  std::vector<KsckTableSummary> table_summaries;
+  std::vector<KsckTabletSummary> tablet_summaries;
+};
+
 // A class implementing logic for Kudu cluster rebalancing.
 class Rebalancer {
  public:
@@ -96,6 +109,7 @@ class Rebalancer {
     std::string tablet_uuid;
     std::string ts_uuid_from;
     std::string ts_uuid_to;
+    boost::optional<int64_t> config_opid_idx; // for CAS-enabled Raft changes
   };
 
   enum class RunStatus {
@@ -288,7 +302,6 @@ class Rebalancer {
 
   // Auxiliary Ksck object to get information on the cluster.
   std::shared_ptr<Ksck> ksck_;
-
 };
 
 } // namespace tools

Reply via email to