kevinrr888 commented on code in PR #5537: URL: https://github.com/apache/accumulo/pull/5537#discussion_r2166819210
########## server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java: ########## @@ -321,6 +373,83 @@ void getAssignments(SortedMap<TServerInstance,TabletServerStatus> currentStatus, Map::putAll), assignedOut); tabletBalancer.getAssignments(params); + if (!canAssignAndBalance()) { + // remove assignment for user tables + Iterator<KeyExtent> iter = assignedOut.keySet().iterator(); + while (iter.hasNext()) { + KeyExtent ke = iter.next(); + if (!ke.isMeta()) { + iter.remove(); + log.trace("Removed assignment for {} as assignments for user tables is disabled.", ke); + } + } + } + } + + private boolean canAssignAndBalance() { + final int threshold = getManager().getConfiguration() + .getCount(Property.MANAGER_TABLET_BALANCER_TSERVER_THRESHOLD); + if (threshold == 0) { + return true; + } + final int numTServers = getManager().tserverSet.size(); + final boolean result = numTServers >= threshold; + if (!result) { + log.warn("Not assigning or balancing as number of tservers ({}) is below threshold ({})", + numTServers, threshold); + } + return result; + } + + boolean shouldCleanupMigration(TabletMetadata tabletMetadata) { Review Comment: nitpick: this can be private ########## server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java: ########## @@ -321,6 +373,83 @@ void getAssignments(SortedMap<TServerInstance,TabletServerStatus> currentStatus, Map::putAll), assignedOut); tabletBalancer.getAssignments(params); + if (!canAssignAndBalance()) { + // remove assignment for user tables + Iterator<KeyExtent> iter = assignedOut.keySet().iterator(); + while (iter.hasNext()) { + KeyExtent ke = iter.next(); + if (!ke.isMeta()) { + iter.remove(); + log.trace("Removed assignment for {} as assignments for user tables is disabled.", ke); + } + } + } + } + + private boolean canAssignAndBalance() { + final int threshold = getManager().getConfiguration() + .getCount(Property.MANAGER_TABLET_BALANCER_TSERVER_THRESHOLD); + if (threshold == 0) { + return true; + } + final int numTServers = getManager().tserverSet.size(); + final boolean result = numTServers >= threshold; + if (!result) { + log.warn("Not assigning or balancing as number of tservers ({}) is below threshold ({})", + numTServers, threshold); + } + return result; + } + + boolean shouldCleanupMigration(TabletMetadata tabletMetadata) { + var tableState = getContext().getTableManager().getTableState(tabletMetadata.getTableId()); + var migration = tabletMetadata.getMigration(); + Preconditions.checkState(migration != null, + "This method should only be called if there is a migration"); + return tableState == TableState.OFFLINE + || !getManager().onlineTabletServers().contains(migration) + || (tabletMetadata.getLocation() != null + && tabletMetadata.getLocation().getServerInstance().equals(migration)); + } + + public void upgradeComplete() { Review Comment: nitpick: this can be package-private. Also why is it called "upgradeComplete" ########## server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java: ########## @@ -36,61 +42,90 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.TableInfo; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.filters.HasMigrationFilter; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.spi.balancer.BalancerEnvironment; -import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; +import org.apache.accumulo.core.spi.balancer.DoNothingBalancer; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; +import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.accumulo.server.manager.state.UnassignedTablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class BalanceManager { private static final Logger log = LoggerFactory.getLogger(BalanceManager.class); - private final Manager manager; + private final AtomicReference<Manager> manager; protected volatile TabletBalancer tabletBalancer; - private final BalancerEnvironment balancerEnvironment; + private volatile BalancerEnvironment balancerEnvironment; private final BalancerMetrics balancerMetrics = new BalancerMetrics(); private final Object balancedNotifier = new Object(); + private static final long CLEANUP_INTERVAL_MINUTES = Manager.CLEANUP_INTERVAL_MINUTES; - BalanceManager(Manager manager) { - this.manager = manager; - this.balancerEnvironment = new BalancerEnvironmentImpl(manager.getContext()); - initializeBalancer(); + BalanceManager() { + this.manager = new AtomicReference<>(null); } - private void initializeBalancer() { - var localTabletBalancer = - Property.createInstanceFromPropertyName(getContext().getConfiguration(), - Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new TableLoadBalancer()); - localTabletBalancer.init(balancerEnvironment); - tabletBalancer = localTabletBalancer; - log.info("Setup new balancer instance {}", tabletBalancer.getClass().getName()); + public void setManager(Manager manager) { + Objects.requireNonNull(manager); + if (this.manager.compareAndSet(null, manager)) { + this.balancerEnvironment = new BalancerEnvironmentImpl(manager.getContext()); + initializeBalancer(); + } else if (this.manager.get() != manager) { + throw new IllegalStateException("Attempted to set different manager object"); + } } - void propertyChanged(String property) { - if (property.equals(Property.MANAGER_TABLET_BALANCER.getKey())) { - initializeBalancer(); + private Manager getManager() { + // fail fast if not yet set + return Objects.requireNonNull(manager.get()); + } + + private void initializeBalancer() { + String configuredBalancerClass = + getManager().getConfiguration().get(Property.MANAGER_TABLET_BALANCER); + try { + if (tabletBalancer == null + || !tabletBalancer.getClass().getName().equals(configuredBalancerClass)) { + log.debug("Attempting to initialize balancer using class {}, was {}", + configuredBalancerClass, + tabletBalancer == null ? "null" : tabletBalancer.getClass().getName()); + var localTabletBalancer = + Property.createInstanceFromPropertyName(getManager().getConfiguration(), + Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new DoNothingBalancer()); + localTabletBalancer.init(balancerEnvironment); + tabletBalancer = localTabletBalancer; + log.info("tablet balancer changed to {}", localTabletBalancer.getClass().getName()); + } Review Comment: Racy: non-atomic op on volatile var tabletBalancer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org