dlmarion commented on code in PR #4733:
URL: https://github.com/apache/accumulo/pull/4733#discussion_r1671201318
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -960,39 +957,95 @@ private void
checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> ts
}
}
- private long balanceTablets() {
-
- Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+ /**
+ * balanceTablets() balances tables by DataLevel. Return the current set
of migrations
+ * partitioned by DataLevel
+ */
+ private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final
Set<KeyExtent> migrations) {
+ final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
new HashMap<>(DataLevel.values().length);
- migrationsSnapshot().forEach(ke -> {
- partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f ->
new HashSet<>())
- .add(ke);
+ // populate to prevent NPE
+ for (DataLevel dl : DataLevel.values()) {
+ partitionedMigrations.put(dl, new HashSet<>());
+ }
+ migrations.forEach(ke -> {
+ partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
+ });
+ return partitionedMigrations;
+ }
+
+ /**
+ * Given the current tserverStatus map and a DataLevel, return a view of
the tserverStatus map
+ * that only contains entries for tables in the DataLevel
+ */
+ private SortedMap<TServerInstance,TabletServerStatus>
createTServerStatusView(
+ final DataLevel dl, final
SortedMap<TServerInstance,TabletServerStatus> status) {
+ final SortedMap<TServerInstance,TabletServerStatus>
tserverStatusForLevel = new TreeMap<>();
+ status.forEach((k, v) -> {
+ TabletServerStatus copy = v.deepCopy();
+ Set<String> removals = new HashSet<>();
+ if (dl == DataLevel.USER) {
+ removals.add(RootTable.NAME);
+ removals.add(MetadataTable.NAME);
+ } else {
+ copy.getTableMap().keySet().forEach(tableName -> {
+ if (dl == DataLevel.ROOT && !tableName.equals(RootTable.NAME)) {
+ removals.add(tableName);
+ } else if (dl == DataLevel.METADATA &&
!tableName.equals(MetadataTable.NAME)) {
+ removals.add(tableName);
+ }
+ });
+ }
+ removals.forEach(copy.getTableMap()::remove);
+ tserverStatusForLevel.put(k, copy);
});
+ return tserverStatusForLevel;
+ }
+
+ private long balanceTablets() {
final int tabletsNotHosted = notHosted();
BalanceParamsImpl params = null;
long wait = 0;
long totalMigrationsOut = 0;
+ final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+ partitionMigrations(migrationsSnapshot());
+
for (DataLevel dl : DataLevel.values()) {
- final Set<KeyExtent> migrationsForLevel =
partitionedMigrations.get(dl);
- if (migrationsForLevel == null) {
- continue;
- }
if (dl == DataLevel.USER && tabletsNotHosted > 0) {
log.debug("not balancing user tablets because there are {} unhosted
tablets",
tabletsNotHosted);
continue;
}
+ // Create a view of the tserver status such that it only contains the
tables
+ // for this level in the tableMap.
+ final SortedMap<TServerInstance,TabletServerStatus>
tserverStatusForLevel =
+ createTServerStatusView(dl, tserverStatus);
+ // Construct the Thrift variant of the map above for the BalancerParams
+ final SortedMap<TabletServerId,TServerStatus>
tserverStatusForBalancerLevel =
+ new TreeMap<>();
+ tserverStatusForLevel.forEach((tsi, status) ->
tserverStatusForBalancerLevel
+ .put(new TabletServerIdImpl(tsi),
TServerStatusImpl.fromThrift(status)));
+
+ long migrationsOutCount = 0;
long migrationsOutForLevel = 0;
- int i = 0;
+ int attemptNum = 0;
do {
- i++;
- log.debug("Balancing for tables at level {}, times-in-loop: {}", dl,
i);
- params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer,
tserverStatus,
- migrationsForLevel);
+ log.debug("Balancing for tables at level {}, times-in-loop: {}", dl,
++attemptNum);
+ params = BalanceParamsImpl.fromThrift(tserverStatusForBalancerLevel,
+ tserverStatusForLevel, partitionedMigrations.get(dl));
wait = Math.max(tabletBalancer.balance(params), wait);
- migrationsOutForLevel = params.migrationsOut().size();
- for (TabletMigration m :
checkMigrationSanity(tserverStatusForBalancer.keySet(),
+ // The balancer may emit migrations for tables outside of the
+ // current level. We want to honor all migration requests, but
+ // need to keep track of the total for this level as well for
+ // the do-while loop condition.
+ migrationsOutCount = params.migrationsOut().size();
+ for (TabletMigration tm : params.migrationsOut()) {
+ if (dl == DataLevel.of(tm.getTablet().getTable())) {
+ migrationsOutForLevel++;
+ }
+ }
Review Comment:
Removed in f0f0601
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]