ctubbsii commented on a change in pull request #1891:
URL: https://github.com/apache/accumulo/pull/1891#discussion_r568076246
##########
File path:
core/src/main/java/org/apache/accumulo/core/dataImpl/TabletIdImpl.java
##########
@@ -73,4 +80,16 @@ public String toString() {
public Range toRange() {
return ke.toDataRange();
}
+
+ public KeyExtent toKeyExtent() {
+ return ke;
+ }
+
+ public static KeyExtent toKeyExtent(TabletId tabletId) {
+ if (tabletId instanceof TabletIdImpl) {
+ return ((TabletIdImpl) tabletId).toKeyExtent();
+ } else {
+ return new KeyExtent(tabletId.getTable(), tabletId.getEndRow(),
tabletId.getPrevEndRow());
+ }
+ }
Review comment:
Could replace this with a new static factory method in KeyExtent instead
of putting it here, especially if TabletId is a public type, to be consistent
with the other static factory methods there (which are more expressive than
what we get with constructors).
##########
File path: server/manager/src/main/java/org/apache/accumulo/master/Master.java
##########
@@ -852,34 +866,57 @@ private void
checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> ts
}
private long balanceTablets() {
- List<TabletMigration> migrationsOut = new ArrayList<>();
- long wait =
tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus),
- migrationsSnapshot(), migrationsOut);
-
- for (TabletMigration m :
TabletBalancer.checkMigrationSanity(tserverStatus.keySet(),
- migrationsOut)) {
- if (migrations.containsKey(m.tablet)) {
+ BalanceParamsImpl params =
BalanceParamsImpl.fromThrift(tserverStatusForBalancer,
+ tserverStatus, migrationsSnapshot());
+ long wait = tabletBalancer.balance(params);
+
+ for (TabletMigration m :
checkMigrationSanity(tserverStatusForBalancer.keySet(),
+ params.migrationsOut())) {
+ KeyExtent ke = TabletIdImpl.toKeyExtent(m.getTablet());
+ if (migrations.containsKey(ke)) {
log.warn("balancer requested migration more than once, skipping {}",
m);
continue;
}
- migrations.put(m.tablet, m.newServer);
+ TServerInstance tserverInstance =
TabletServerIdImpl.toThrift(m.getNewTabletServer());
+ migrations.put(ke, tserverInstance);
log.debug("migration {}", m);
}
- if (migrationsOut.isEmpty()) {
+ if (params.migrationsOut().isEmpty()) {
synchronized (balancedNotifier) {
balancedNotifier.notifyAll();
}
} else {
- nextEvent.event("Migrating %d more tablets, %d total",
migrationsOut.size(),
+ nextEvent.event("Migrating %d more tablets, %d total",
params.migrationsOut().size(),
migrations.size());
}
return wait;
}
+ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId>
current,
+ List<TabletMigration> migrations) {
+ return migrations.stream().filter(m -> {
+ boolean includeMigration = false;
+ if (m.getTablet() == null) {
+ log.warn("Balancer gave back a null tablet {}", m);
+ } else if (m.getNewTabletServer() == null) {
+ log.warn("Balancer did not set the destination {}", m);
+ } else if (m.getOldTabletServer() == null) {
+ log.warn("Balancer did not set the source {}", m);
+ } else if (!current.contains(m.getOldTabletServer())) {
+ log.warn("Balancer wants to move a tablet from a server that is not
current: {}", m);
+ } else if (!current.contains(m.getNewTabletServer())) {
+ log.warn("Balancer wants to move a tablet to a server that is not
current: {}", m);
Review comment:
Most of these seem like they should be errors instead of warnings, as
they are incorrectly behaving balancers. I think the "wants to move... not
current" are probably okay as warnings, though, because they seem like they
could happen as the result of things changing on the cluster since the balancer
made a choice.
##########
File path:
core/src/main/java/org/apache/accumulo/core/spi/balancer/DefaultLoadBalancer.java
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.balancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
+import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
+import org.apache.accumulo.core.spi.balancer.data.TableStatistics;
+import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
+import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
+import org.apache.accumulo.core.spi.balancer.data.TabletStatistics;
+import
org.apache.accumulo.core.spi.balancer.util.ThrottledBalancerProblemReporter;
+import
org.apache.accumulo.core.spi.balancer.util.ThrottledBalancerProblemReporter.OutstandingMigrationsProblem;
+import
org.apache.accumulo.core.spi.balancer.util.ThrottledBalancerProblemReporter.Problem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since 2.1.0
+ */
+public class DefaultLoadBalancer implements TabletBalancer {
Review comment:
I wonder if the interface should be called `TabletLoadBalancer` instead
of `TabletBalancer`. I think it might make things more clear.
For the default implementation, it might be nice to name it something based
on what it does, rather than it having been set in the default config. That
way, if the default changes in future, we don't have a class called
`DefaultLoadBalancer` that is not actually configured as the default
implementation. (Consider, for example, the "default Java GC" implementations,
which have specific names and the selected default has changed over time.)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]