Andrew Wong has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/14177 )

Change subject: WIP KUDU-2780: create thread for auto-rebalancing
......................................................................


Patch Set 3:

(35 comments)

Didn't look at auto_rebalancer-test yet

http://gerrit.cloudera.org:8080/#/c/14177/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/14177/3//COMMIT_MSG@8
PS3, Line 8: 
           : Created auto-rebalancer thread in background tasks of catalog 
manager.
           : Set up framework for auto-rebalancing loop.
           : Loop retrieves information on tservers, tables, and tablets for 
rebalancing.
           : The number of replica moves per loop iteration is controlled by a 
flag.
           : If there are placement policy violations, the current loop 
iteration will only
           : perform replica moves to reinstate the policy.
           : Otherwise, the auto-rebalancer will perform moves to do 
inter-location(cross-location),
           : then intra-location(by table then by tserver) rebalancing.
           : If the cluster is balanced, the current rebalancing cycle 
completes, and the
           : thread will sleep for an interval, controlled by another flag.
nit: I found this a little difficult to read. Could you split it up into 
paragraphs?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.h
File src/kudu/master/auto_rebalancer.h:

http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.h@93
PS3, Line 93:   // The autorebalancing thread.
            :   scoped_refptr<kudu::Thread> thread_;
We don't expect there to be multiple variables containing `thread_` do we? 
Could this just be a unique_ptr?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.h@104
PS3, Line 104:   // The internal Rebalancer object.
             :   std::shared_ptr<rebalance::Rebalancer> rebalancer_;
Same here; are there expected to be multiple threads holding onto `rebalancer_`?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.h@106
PS3, Line 106:
             :   // Functions for the Rebalancer object:
             :
             :   // Collects information about the cluster.
             :   Status BuildClusterRawInfo(const boost::optional<std::string>& 
location,
             :                              rebalance::ClusterRawInfo* 
raw_info) const;
             :
             :   Status GetMovesUsingRebalancingAlgo(
             :       const rebalance::ClusterRawInfo& raw_info,
             :       std::vector<rebalance::Rebalancer::ReplicaMove>* 
replica_moves,
             :       rebalance::RebalancingAlgo* algo,
             :       bool cross_location = false) const;
             :
             :   Status GetMoves(const rebalance::ClusterRawInfo& raw_info,
             :                   const rebalance::ClusterInfo& ci,
             :                   
std::vector<rebalance::Rebalancer::ReplicaMove>* replica_moves,
             :                   const rebalance::TabletsPlacementInfo& 
placement_info,
             :                   bool* policy_fixing) const;
             :
             :   Status GetTabletLeader(const std::string& tablet_id, 
std::string* leader_uuid, HostPort* leader_hp) const;
             :
             :   Status ExecuteMoves(const 
std::vector<rebalance::Rebalancer::ReplicaMove>& replica_moves, const bool& 
policy_fixing);
             :
             :   // Returns whether or not the cluster needs rebalancing.
             :   bool IsClusterBalanced(const rebalance::TabletsPlacementInfo& 
placement_info,
             :                          const rebalance::ClusterInfo& ci) const;
             :
             :   // Helper function to retrieve TSManager.
             :   TSManager* GetTSManager() const;
nit: Could you move these above the private member variables?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc
File src/kudu/master/auto_rebalancer.cc:

PS3:
nit: got a bunch of whitespace sprinkled throughout.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@104
PS3, Line 104: DEFINE_string(auto_rebalancing_ignored_tservers, "",
             :               "UUIDs of tablet servers to ignore while 
auto-rebalancing the cluster "
             :               "(comma-separated list). If specified, allow the 
auto-rebalancing "
             :               "when some tablet servers in 'ignored_tservers' 
are unhealthy. "
             :               "If not specified, allow the rebalancing only when 
all tablet "
             :               "servers are healthy.");
             :
             : DEFINE_string(auto_rebalancing_tables, "",
             :               "Tables to rebalance (comma-separated list of 
table names)"
             :               "If not specified, includes all tables.");
             :
It seems like these are initialized once and cached, so it seems a bit odd to 
have these as a flag, seeing as we can't update them ever. Can you think of a 
use case in which these is actually useful when deploying a master? Versus 
setting them through some RPC or something


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@119
PS3, Line 119: // A typical Kudu cluster has 5+ nodes.
What exactly is the guidance here? Should the value of this flag be correlated 
with the number of nodes?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@144
PS3, Line 144:   Rebalancer::Config c = Rebalancer::Config(
nit: if you stored this as a const private member, you wouldn't have to add 
ShouldRunPolicyFixer() and co.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@146
PS3, Line 146: master_addresses
Does this get used anywhere?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@190
PS3, Line 190:     {
             :       std::lock_guard<Mutex> l(lock_);
             :       if (closing_) return;
             :     }
If we're only locking to protect `closing_`, a spinlock might be more 
appropriate. Or an atomic<bool>.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@195
PS3, Line 195:     // If not the leader, don't do anything.
             :     {
             :       CatalogManager::ScopedLeaderSharedLock l(catalog_manager_);
             :       if (!l.first_failed_status().ok()) {
             :         number_of_loop_iterations = 0;
             :         moves_scheduled_this_round = 0;
             :         continue;
             :       }
             :     }
Rather than repeatedly checking whether we're leader (and locking in the 
process of doing so!), could we check if we're leader once and return if not? 
And then when we are elected leader, start up?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@238
PS3, Line 238:       
SleepFor(MonoDelta::FromSeconds(FLAGS_auto_rebalancing_interval_seconds));
Should we sleep for all the other continue cases?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@254
PS3, Line 254:
             :       // Check flag again, in case auto-rebalancing flag has 
been paused.
             :       if (FLAGS_auto_rebalancing_stop_flag) continue;
Seems like not much happened between our initial check (no RPCs or anything 
AFAICT), so it's probably safe to proceed without this.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@271
PS3, Line 271: Status AutoRebalancerTask::GetMoves(const ClusterRawInfo& 
raw_info, const ClusterInfo& ci, vector<Rebalancer::ReplicaMove>* 
replica_moves, const TabletsPlacementInfo& placement_info, bool* policy_fixing) 
const {
nit: wrap this. Same elsewhere.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@279
PS3, Line 279:
             :   if (ts_id_by_location.size() == 1) {
             :     rebalance::TwoDimensionalGreedyAlgo algo;
             :     RETURN_NOT_OK(GetMovesUsingRebalancingAlgo(raw_info, 
replica_moves, &algo));
             :     *policy_fixing = false;
             :   } else {
             :
             :     if (rebalancer_->ShouldRunPolicyFixer()) {
             :       vector<PlacementPolicyViolationInfo> ppvi;
             :       
RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
             :       // Filter out all reported violations which are already 
taken care of.
             :       RETURN_NOT_OK(FindMovesToReimposePlacementPolicy(
             :           placement_info, ci.locality, ppvi, replica_moves));
             :       *policy_fixing = true;
             :     }
             :
             :     if (replica_moves->empty()) {
             :       if (rebalancer_->ShouldRunCrossLocationRebalancing()) {
             :         rebalance::LocationBalancingAlgo algo(1.0);
             :         RETURN_NOT_OK(GetMovesUsingRebalancingAlgo(
             :             raw_info, replica_moves, &algo, 
/*cross_location=*/true));
             :       }
             :
             :       if (rebalancer_->ShouldRunIntraLocationRebalancing() && 
replica_moves->size() < max_moves) {
             :         rebalance::TwoDimensionalGreedyAlgo algo;
             :         for (const auto& elem : ts_id_by_location) {
             :           const auto& location = elem.first;
             :           ClusterRawInfo location_raw_info;
             :           BuildClusterRawInfo(location, &location_raw_info);
             :           RETURN_NOT_OK(GetMovesUsingRebalancingAlgo(
             :             location_raw_info, replica_moves, &algo));
             :         }
             :       }
             :       *policy_fixing = false;
             :     }
             :   }
             :
             :   return Status::OK();
             : }
nit: This could use some comments.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@377
PS3, Line 377:     std::shuffle(tablet_ids.begin(), tablet_ids.end(), 
random_device());
             :     string move_tablet_id;
             :     for (const auto& tablet_id : tablet_ids) {
             :       if (tablets_in_move.find(tablet_id) == 
tablets_in_move.end()) {
             :         // For now, choose the very first tablet that does not 
have replicas
             :         // in move. Later on, additional logic might be added to 
find
             :         // the best candidate.
             :         move_tablet_id = tablet_id;
             :         break;
             :       }
             :     }
             :     if (move_tablet_id.empty()) {
             :       return Status::NotFound(Substitute(
             :           "table $0: could not find any suitable replica to move 
"
             :           "from server $1 to server $2", move.table_id, 
move.from, move.to));
             :     }
             :     Rebalancer::ReplicaMove move_info;
             :     move_info.tablet_uuid = move_tablet_id;
             :     move_info.ts_uuid_from = move.from;
             :     const auto& extra_info = FindOrDie(extra_info_by_tablet_id, 
move_tablet_id);
             :     if (extra_info.replication_factor < extra_info.num_voters) {
             :       // The number of voter replicas is greater than the target 
replication
             :       // factor. It might happen the replica distribution would 
be better
             :       // if just removing the source replica. Anyway, once a 
replica is removed,
             :       // the system will automatically add a new one, if needed, 
where t
Seems like a good chunk of this is taken from rebalancer_tool.cc. Can we 
refactor this out and use it in both implementations?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@439
PS3, Line 439: Status AutoRebalancerTask::ExecuteMoves(const 
vector<Rebalancer::ReplicaMove>& replica_moves, const bool& policy_fixing) {
Please add comments


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@452
PS3, Line 452:       RETURN_NOT_OK(GetTabletLeader(tablet_id, &leader_uuid, 
&leader_hp));
We called CatalogManager::GetTabletLocations() when we computed the cluster 
stats. Would it be very difficult to reuse the locations found there?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@479
PS3, Line 479:   } else { // ScheduleReplicaMove()
How difficult would it be to refactor and share across implementations?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@495
PS3, Line 495:       shared_ptr<Messenger> messenger;
Could we build this once for the Task and reuse it?

Alternatively, there's a messenger in Master. Is it safe to plumb that all the 
way down here?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@514
PS3, Line 514:         TSManager* ts_manager = GetTSManager();
             :         if (!ts_manager->LookupTSByUUID(dst_ts_uuid, &desc)) {
             :           return Status::NotFound("Could not find destination 
tserver");
             :         }
Could probably be done before updating anything.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@538
PS3, Line 538:   const boost::optional<string>& location,
             :   ClusterRawInfo* raw_info) const {
nit: indent a couple more here


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@545
PS3, Line 545: GetTSManager
Why not call catalog_manager_->ts_manager() directly?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@561
PS3, Line 561:   // GetAllTables() requires holding leader lock.
nit: remove this


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@620
PS3, Line 620:   }
             :
             :   else {
nit: join these lines


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@637
PS3, Line 637: decltype(summary.replicas)
nit: this is much less readable than specifying the actual type


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@615
PS3, Line 615:   if (!location) {
             :     // Information on the whole cluster.
             :     raw_info->tserver_summaries = std::move(tserver_summaries);
             :     raw_info->tablet_summaries = std::move(tablet_summaries);
             :     raw_info->table_summaries = std::move(table_summaries);
             :   }
             :
             :   else {
             :     // Information on the specified location only: filter out 
non-relevant info.
             :     const auto& location_str = *location;
             :
             :     unordered_set<string> ts_ids_at_location;
             :     for (const auto& summary : tserver_summaries) {
             :       if (summary.ts_location == location_str) {
             :         raw_info->tserver_summaries.push_back(summary);
             :         InsertOrDie(&ts_ids_at_location, summary.uuid);
             :       }
             :     }
             :
             :     unordered_set<string> table_ids_at_location;
             :     for (const auto& summary : tablet_summaries) {
             :       const auto& replicas = summary.replicas;
             :       decltype(summary.replicas) replicas_at_location;
             :       replicas_at_location.reserve(replicas.size());
             :       for (const auto& replica : replicas) {
             :         if (ContainsKey(ts_ids_at_location, replica.ts_uuid)) {
             :           replicas_at_location.push_back(replica);
             :         }
             :       }
             :       if (!replicas_at_location.empty()) {
             :         table_ids_at_location.insert(summary.table_id);
             :       }
             :       raw_info->tablet_summaries.push_back(summary);
             :       raw_info->tablet_summaries.back().replicas = 
std::move(replicas_at_location);
             :     }
             :
             :     for (const auto& summary : table_summaries) {
             :       if (ContainsKey(table_ids_at_location, summary.id)) {
             :         raw_info->table_summaries.push_back(summary);
             :       }
             :     }
             :   }
Can this filtering be done in the above loop itself? Why or why not?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@665
PS3, Line 665: auto
Could these be const refs too? Same below?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@667
PS3, Line 667: 1
Is it ok for this to be hardcoded? Should there be some threshold for it? Are 
we guaranteed to eventually get no skew? Same below.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@661
PS3, Line 661: bool IsClusterOrTablesSkewed(const ClusterInfo& ci) {
             :
             :   // Cluster skew?
             :   const auto& servers_load_info = 
ci.balance.servers_by_total_replica_count;
             :   const auto min_replica_count = 
servers_load_info.begin()->first;
             :   const auto max_replica_count = 
servers_load_info.rbegin()->first;
             :   if (max_replica_count-min_replica_count > 1) return true;
             :
             :   // Table skew?
             :   const auto& table_skew_info = ci.balance.table_info_by_skew;
             :   const auto min_table_skew = table_skew_info.begin()->first;
             :   const auto max_table_skew = table_skew_info.rbegin()->first;
             :   if (max_table_skew-min_table_skew > 1) return true;
             :
             :   return false;
             : }
nit: Put this in an anonymous namespace.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/auto_rebalancer.cc@690
PS3, Line 690: if (IsClusterOrTablesSkewed(ci)) return false;
nit: maybe write this so we don't have to have the extra else scope

if (ts_id_by_locations.size() == 1) {
  if (IsClusterOrTablesSkewed(ci)) {
    return false;
  }
  return true;
}

 // Multiple locations...


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/catalog_manager.cc
File src/kudu/master/catalog_manager.cc:

http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/catalog_manager.cc@287
PS3, Line 287: true
We should probably set this to false by default.


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/ts_descriptor.h
File src/kudu/master/ts_descriptor.h:

http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/ts_descriptor.h@160
PS3, Line 160:   HostPort last_location_HP() const;
This is a misnomer; how about `last_address()` or something?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/ts_descriptor.h@159
PS3, Line 159:   // Return the TS address: the last known host/port.
             :   HostPort last_location_HP() const;
             :
             :   // Return a string form of the TS address: the last known 
host/port.
             :   std::string last_location() const;
Why do we need both of these? Wouldn't it suffice to use `HostPort::ToString()` 
if we need the string?


http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/ts_descriptor.cc
File src/kudu/master/ts_descriptor.cc:

http://gerrit.cloudera.org:8080/#/c/14177/3/src/kudu/master/ts_descriptor.cc@301
PS3, Line 301: 0
Can there be multiple of these? What about the other ones?



--
To view, visit http://gerrit.cloudera.org:8080/14177
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ifca25d1063c07047cf2123e6792b3c7395be20e4
Gerrit-Change-Number: 14177
Gerrit-PatchSet: 3
Gerrit-Owner: Hannah Nguyen <[email protected]>
Gerrit-Reviewer: Alexey Serbin <[email protected]>
Gerrit-Reviewer: Andrew Wong <[email protected]>
Gerrit-Reviewer: Hannah Nguyen <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Reviewer: Tidy Bot (241)
Gerrit-Comment-Date: Sat, 14 Sep 2019 03:49:35 +0000
Gerrit-HasComments: Yes

Reply via email to