leventov opened a new issue #7458: New Coordinator segment balancing/loading 
algorithm
URL: https://github.com/apache/incubator-druid/issues/7458
 
 
   ### Motivation and goal
   
   Currently, there is no segment balancing/loading algorithm. There is just 
code that happens to behave somehow, likely that nobody understands, how. It 
leads to incoherent/repeated configurations (#7159) and defects (#7202, #7383, 
#6329). It should also be possible to significantly speed up balancing/loading 
by not waiting until all segment movements from the previous balancing burst 
are done (#7159, #7344).
   
   The goal of this proposal is to establish terminology, introduce a coherent 
and more operationally-friendly set of configuration parameters, and define the 
balancing/loading algorithm. Then I have an intention to implement all the 
above.
   
   ### Terminology
   
   First, let's establish terminology.
   
   There are two groups of segments:
    - *Replicated segments*: governed by LoadRules.
    - *Broadcasted segments*: governed by BroadcastDistributionRules.
   
   *Unreplicated segments* are such replicated segments that should but are not 
currently loaded by any historical server.
   
   *Underreplicated segments (in a tier)* are such replicated segments whose 
target replication count (*replicants*) in the tier is higher than the number 
of historical servers in the tier loading the segment at the moment. In the 
algorithm below, unreplicated and underreplicated (or fully replicated) 
segments are considered disjoint groups: unreplicated segments are *not* called 
underreplicated in each tier with zero currently loading servers. But since a 
segment is loaded by at least one server in any tier it becomes underreplicated 
in *all* other tiers where it should be loaded.
   
   *Fully replicated segments (in a tier)* are such replicated segments whose 
target replication count in the tier is equal to the number of historical 
servers in the tier loading the segment at the moment.
   
   *Overreplicated segments (in a tier)* are such replicated segments whose 
target replication count in the tier is lower than the number of historical 
servers in the tier loading the segment at the moment.
   
   *Underbroadcasted segments (in a tier)* are such broadcasted segments that 
are not loaded on some servers where they should be loaded. These servers 
should include decommissioning servers, except for the decommissioning servers 
that only hold broadcasted segments at the moment (see #7383).
   
   ### Configurations
   
   #### Existing configuration, still used
    - `maxSegmentsInNodeLoadingQueue`.
   
   #### New configurations
    - `numBestServersChoosingFunction`, a JavaScript function. Defaults to 
`function (numServersInTier, targetReplicants) { return targetReplicants + 
Math.floor(Math.log(numServersInTier) / 
Math.log(numBestServersChoosingFunctionDefaultLogBase)); }`
    - `numBestServersChoosingFunctionDefaultLogBase`, defaults to 5. Determines 
the specific shape of the default `numBestServersChoosingFunction` which 
determines the number of best servers (according to `CostBalancerStrategy`) in 
a tier to consider loading a segment on. The single best server may be 
unavailable for loading if its loading queue is already full (with many 
different definitions of "full" which will be explained below). In the current 
algorithm, `numBestServersChoosingFunction` is implicitly `return 
targetReplicants;` (if we were to define it), in other words, we always 
consider only `targetReplicants` "best" servers in a tier for loading.
   
   `numBestServersChoosingFunction` (or 
`numBestServersChoosingFunctionDefaultLogBase` if used to adjust the default 
function) is the single parameter allowing to balance between the loading speed 
and the precision: a higher number allows higher balancing thoughput, but 
balancing decisions may be less optimal.
   
   With the default function equivalent to `return targetReplicants + 
Math.floor(Math.log(numServersInTier) / Math.log(5));`, the behavior will be as 
"precise" as the current behavior in tiers of less than 5 servers, +1 server 
"imprecision allowance" in tiers between 6 and 25 servers, +2 servers in tiers 
between 26 and 125 servers, etc.
   
    - `loadingMinPercent`, defaults to 70.
    - `decommissioningMinPercent`, defaults to 10.
    - `decommissioningMaxPercent`, defaults to 70, or 
`decommissioningMaxPercentOfMaxSegmentsToMove` if this legacy parameter is 
specified.
    - `balancingMinPercent`, defaults to 20.
   
   There are no "loadingMaxPercent" and "balancingMaxPercent" because there is 
no reason to cap loading and balancing (as long as all "minPercents" are 
satisfied).
   
   `loadingMinPercent` + `decommissioningMinPercent` + `balancingMinPercent` 
must be equal to 100. If only one of these three parameters is specified, the 
other two are determined by applying their default proportions to `100 - 
theSpecifiedMinPercent`. If two of these three parameters are specified, the 
third one is determined by complementing to 100.
   
   These are "minPercents" and "maxPercents" in 
`maxSegmentsInNodeLoadingQueue`, not `maxSegmentsToMove`. This is not reflected 
in the names of the parameters to avoid over specifying the algorithm (like for 
`decommissioningMaxPercentOfMaxSegmentsToMove`, the parameter name should 
become obsolete very soon after the introduction).
   
    -  `segmentLoadingMaxSecondsBeforeAlerting`, defaults to 900 (15 minutes), 
or `replicantLifetime` * `druid.coordinator.period` if the legacy 
`replicantLifetime` parameter is specified.
   
   #### Retired configurations
    - `maxSegmentsToMove`
    - `replicationThrottleLimit`,
    - `decommissioningMaxPercentOfMaxSegmentsToMove`, unless used to default 
`decommissioningMaxPercent` for backward compatibility.
    - `replicantLifetime`, unless used to default 
`segmentLoadingMaxSecondsBeforeAlerting` for backward compatibility.
   
   ### Proposed algorithm
   
   #### 1. Unreplicated segments
   
   A "loading" stage.
   
    1. Use `CostBalancerStrategy` to determine top `B` "best" servers in every 
tier where the segment should be loaded, excluding decommissioning servers, 
where `B` is determined by `numBestServersChoosingFunction` (in each tier 
differently).
    2. Determine the "primary" tier (see #4757)
    3. Schedule loading of the segment on *one* server in the primary tier.
   
   Limit: on this stage, loading queue of any server can't contain more than 
`maxSegmentsInNodeLoadingQueue` * `loadingMinPercent` segments that were 
scheduled to be loaded on the server during one of the "loading" stages of the 
algorithm, including this stage. This may include segment loading requests 
hanging since the previous Coordinator's run (see #7159). Below, this kind of 
"typed" node loading queue limit is shortly identified as *(relative)*.
   
   If a segment can't be loaded on any server in the "primary" tier because 
loading queues of all `B` "best" servers are already sufficiently full with 
"loading" requests, step 3. is repeated for non-primary tiers.
   
   After this stage, unreplicated segments are split into the following groups:
    - ToBecomeFullyReplicated (per tier)
    - ToBecomeUnderreplicated (per tier)
    - StillUnreplicated
   
   #### 2. Underbroadcasted segments
   
   A "loading" stage.
   
   In every tier where the segment is underbroadcasted, schedule loading of the 
segment on all servers in a tier where the segment is not currently loaded.
   
   Limit: `loadingMinPercent` (relative).
   
   After this stage, underbroadcasted segments are split into the following 
groups:
    - ToBecomeFullyBroadcasted (per tier)
    - StillUnderbroadcasted (per tier)
   
   #### 3. Underreplicated segments
   
   A "loading" stage.
   
   The following steps are performed in every tier where a segment is 
underreplicated:
   
    1. Determine top `B` "best" servers in the tier for the segment.
    2. Schedule loading of the segment on as many servers (among those 
determined on the previous step) as needed to match the target replication 
count in the tier. If the segment is also overreplicated in some other tier, 
add a callback to schedule unloading the segment from one of the servers where 
it is currently loaded in the other tier, beyond the target replication count 
in that tier, upon successful loading in the tier where the segment is 
underreplicated. (If there are more than one server where the segment is loaded 
in the tier where it is overreplicated and the target replication count in that 
tier is greater than zero, determining the server to unload the segment from 
may require to consult to `CostBalancerStrategy`.)
   
   Limit: `loadingMinPercent` (relative).
   
   After this stage, underreplicated segments are split into the following 
groups:
    - ToBecomeFullyReplicated (per tier). These per-tier groups are initialized 
during stage 1, segments are added to the existing groups.
    - StillUnderreplicated (per tier)
   
   #### 4. ToBecomeUnderreplicated segments
   
   A "loading" stage.
   
   In every tier where a segment is underreplicated, schedule loading of the 
segment on as many servers (among the top `B` "best" servers determined for the 
segment during stage 1) as needed to match the target replication count in the 
tier.
   
   Limit: `loadingMinPercent` (relative).
   
   After this stage, ToBecomeUnderreplicated segments are split into the 
following groups:
    - ToBecomeFullyReplicated (per tier). These per-tier groups are initialized 
during stage 1, segments are added to the existing groups.
    - ToBecomeUnderreplicated (per tier). In other words, segments that weren't 
scheduled for loading on enough servers in the tier due to the 
`loadingMinPercent` limit remain in the group where they were at the beginning 
of this stage.
   
   #### 5. Move replicated segments away from decommissioning servers
   
   A "decommissioning" stage.
   
   The following steps are performed in every tier, for every replicated 
segment loaded on one of the decommissioning servers in the tier:
   
    1. Determine top `B` "best" servers in the tier for the segment, or reuse 
already computed top `B` servers during stage 3 if the segment is 
underreplicated in the tier.
    2. Schedule loading of the segment on the servers determined on the 
previous step. Add a callback to schedule unloading the segment from one of the 
decommissioning servers upon successful loading. (This type of operation is 
currently implemented in `DruidCoordinator.moveSegment()`.)
   
   Limit: `decommissioningMinPercent` (relative).
   
   After this stage, ToMoveAwayFromDecommServers segments are split into the 
following groups:
    - ToBecomeNotLoadedOnDecommServers (per tier)
    - StillLoadedOnDecommServers (per tier)
   
   #### 6. First revisit of loading segments
   
   A "loading" stage.
   
   Repeat:
    1. Stage 1 for StillUnreplicated segments,
    2. Stage 2 for StillUnderbroadcasted segments,
    3. Stage 3 for StillUnderreplicated segments,
    4. Stage 4 for ToBecomeUnderreplicated segments,
   
   With the difference that the computed top `B` "best" servers are reused from 
the previous stages, and with a different limit (otherwise this stage wouldn't 
make much sense):
   
   Limit: on this stage, loading queue of any server can't contain more than 
`maxSegmentsInNodeLoadingQueue * (loadingMinPercent + 
decommissioningMinPercent)` segments. This may include segment loading requests 
hanging since the previous Coordinator's run.
   
   #### 7. Segment balancing
   
   A "balancing" stage.
   
   There is a loop performed for every tier in the cluster. Steps of the loop:
   
    1. Choose a segment randomly among fully replicated and underreplicated 
segments (as of the beginning of the algorithm run) that need to be loaded in 
the tier.
    2. If the chosen segment has already been visited during this loop for the 
tier, mark this iteration as "failed" and proceed to the next iteration.
    3. Determine top `B` "best" servers in the tier for the segment, or reuse 
already computed top `B` servers during stage 3 or 5.
    4. If the segment shouldn't be moved because the servers where it is 
currently loaded are all among the top `B` "best" servers for the segment, mark 
this iteration as "failed" and proceed to the next iteration.
    
    *Alternative design:* we may decide that a segment movement is worthwhile 
if there is any server where the segment is not loaded (among the top `B`) 
"better" than any server where the segment is currently loaded, rather than 
only if one of the servers, where the segment is currently loaded, is out of 
the top `B` "best" servers. However, avoid making "too precise" balancing 
movements like moving a segment from the server ranked second to the server 
ranked first by cost (according to `CostBalancerStrategy`) leaves more room for 
segment movements of relatively higher importance, before the loading queues of 
the servers are sufficiently filled up.
    5. If the segment can't be moved because the loading queues of all 
destination servers (those among the top `B`, while some servers where the 
segment is currently loaded are out of the top `B`) are already sufficiently 
full (`balancingMinPercent` (relative)), add the segment to the ToBeBalanced 
group (for the tier), mark the iteration as "failed", and proceed the the next 
iteration.
    6. Mark the iteration as "successful" and schedule a segment movement (like 
on stage 5, step 2).
   
   The loop exits when among 100 last iterations, at least 80 of them are 
"failed".
   
   Limit: `balancingMinPercent` (relative).
   
   *Comment:* the 80/100 "failed" iterations condition replaces the static 
bound `maxSegmentsToMove`. Its advantage is that this condition is independent 
of the tier size. It should work for all tiers from containing a single server 
(then all 100 first iterations will fail and the loop will exit) to hundreds of 
servers. I don't know if 80 is the good threshold (and if it should be 
different in tiers of different sizes) because we don't have #5987 yet running 
in our clusters so we don't have good visibility into real "moved" and 
"unmoved" counts now. **I need some input from the community to determine 
that.**
   
   #### 8. Second revisit of loading segments
   
   A "loading" stage.
   
   Repeat stage 6, with the difference that loading queues of all servers can 
contain up to `maxSegmentsInNodeLoadingQueue` segments on this stage. This 
limit is also implicitly in place during all previous stages. This type of 
limit is shortly identified as *full `maxSegmentsInNodeLoadingQueue`* below.
   
   #### 9. StillLoadedOnDecommServers segments
   
   A "decommissioning" stage.
   
   Repeat stage 5 for StillLoadedOnDecommServers segments and with a different 
limit: `decommissioningMaxPercent` (relative).
   
   #### 10. ToBeBalanced segments
   
   A "balancing" stage.
   
   In every tier, for every ToBeBalanced segment in that tier, schedule as many 
as possible movements of the segment from the servers where the segment is 
currently loaded that are out of the top `B` best servers in the tier for the 
segment to one of the servers within the top `B`.
   
   Limit: full `maxSegmentsInNodeLoadingQueue`.
   
   #### 11. Repeat segment balancing
   
   Repeat stage 7 with a different limit: full `maxSegmentsInNodeLoadingQueue`. 
A segment is considered already visited on step 2 if it was visited during 
stage 7. Also, during this stage, segments don't need to be added to the 
ToBeBalanced group.
   
   #### 12. Unload excessively overreplicated segments
   
   For every segment, if the total replication count in all tiers exceeds the 
sum of target replication counts in all tiers, schedule unloading of the 
segment from as many servers in the tiers where the segment is overreplicated 
as needed to match the total replication count with the sum of target 
replication counts. If there is more than one server where the segment is 
loaded in any tier where it is overreplicated and the target replication count 
in that tier is greater than zero, determining the servers to unload the 
segment from may require to consult to `CostBalancerStrategy`.
   
   Note that the replicated segments to which one of DropRules currently 
applies should be unloaded during this stage.
   
   "Non-excessively overreplicated" segments should be scheduled for unloaded 
in callbacks during stage 3.
   
   #### 13. Unload broadcasted segments from decommissioning servers
   
   For every decommissioning server that is currently holding only broadcasted 
segments, schedule unloading all these segments from the server.
   
   ### Rationale
   
   Compared to the current behavior of Coordinator, the proposed algorithm 
prioritizes availability as much as possible in the face of abrupt changes in 
the cluster, for example, when a lot of Historical nodes become unavailable and 
a lot of segments become unreplicated because of that. During stage 1, only a 
single replica for every unreplicated segment is scheduled for loading, giving 
a chance for other unreplicated segments to be scheduled for loading before 
loading queues of all "good" servers are filled up.
   
   Note that top `B` "best" servers are reused between the stages without 
recomputing, although some loading decisions that happen in between may affect 
the scores of the servers. This is done not to repeat expensive computations 
many times in `CostBalancerStrategy` for every segment: in the proposed 
algorithm, a segment may be visited up to five times during different stages in 
a context of a single tier (an underreplicated segment can be visited during 
stages 3, 6, 7, 8, and 10). I think this shouldn't affect the quality of 
cost-based balancing much, however, this opinion is not grounded in anything.
   
   ### Implementation notes
   
   I don't have a firm understanding or confidence in what I write below in 
this section because I didn't advance much in the implementation yet.
   
   To handle many different groups of segments (up to dozens, depending on the 
number of tiers in the cluster) I plan to use bitsets. The index corresponds to 
the position of the segment in the 
`DruidCoordinatorRuntimeParams.availableSegments` (to be renamed into 
`usedSegments` in #7306). To allow effective indexing, this data structure is 
turned into a sorted array instead of `TreeSet`. `set.contains()`-like 
operations are implemented via `Arrays.binarySearch()`. (This is pretty much 
how Guava's `ImmutableSortedSet` works.)
   
   `LoadQueuePeon` implementations should internally handle three different 
types of segment loading requests: "loading", "decommissioning", and 
"balancing".
   
   As I mentioned in #7159, `SegmentReplicantLookup` should become a concurrent 
data structure.
   
   ### Operational impact
   
   I think the new algorithm should become the default immediately, but I'll 
keep the old version of the code and provide an option to switch to the old 
implementation if the new algorithm will fail in production for some unforeseen 
reasons or bugs.
   
   It doesn't seem reasonable to me to keep the old implementation around for 
more than one Druid release.
   
   <hr>
   
   FYI @egor-ryashin @gianm @clintropolis
   
   FYI @jihoonson, see the parts of the algorithm related to 
BroadcastDistributionRules.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to