keith-turner commented on code in PR #3409:
URL: https://github.com/apache/accumulo/pull/3409#discussion_r1202330484
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java:
##########
@@ -108,51 +108,51 @@ public void setFutureLocations(Collection<Assignment>
assignments)
}
@Override
- public void unassign(Collection<TabletLocationState> tablets,
+ public void unassign(Collection<TabletMetadata> tablets,
Map<TServerInstance,List<Path>> logsForDeadServers) throws
DistributedStoreException {
unassign(tablets, logsForDeadServers, -1);
}
@Override
- public void suspend(Collection<TabletLocationState> tablets,
+ public void suspend(Collection<TabletMetadata> tablets,
Map<TServerInstance,List<Path>> logsForDeadServers, long
suspensionTimestamp)
throws DistributedStoreException {
unassign(tablets, logsForDeadServers, suspensionTimestamp);
}
protected abstract void processSuspension(Ample.ConditionalTabletMutator
tabletMutator,
- TabletLocationState tls, long suspensionTimestamp);
+ TabletMetadata tm, long suspensionTimestamp);
- private void unassign(Collection<TabletLocationState> tablets,
+ private void unassign(Collection<TabletMetadata> tablets,
Map<TServerInstance,List<Path>> logsForDeadServers, long
suspensionTimestamp)
throws DistributedStoreException {
try (var tabletsMutator = ample.conditionallyMutateTablets()) {
- for (TabletLocationState tls : tablets) {
- var tabletMutator =
tabletsMutator.mutateTablet(tls.extent).requireAbsentOperation();
+ for (TabletMetadata tm : tablets) {
+ var tabletMutator =
tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation();
- if (tls.hasCurrent()) {
- tabletMutator.requireLocation(tls.current);
+ if (tm.hasCurrent()) {
+ tabletMutator.requireLocation(tm.getLocation());
ManagerMetadataUtil.updateLastForAssignmentMode(context,
tabletMutator,
- tls.current.getServerInstance(), tls.last);
- tabletMutator.deleteLocation(tls.current);
+ tm.getLocation().getServerInstance(), tm.getLast());
+ tabletMutator.deleteLocation(tm.getLocation());
if (logsForDeadServers != null) {
- List<Path> logs =
logsForDeadServers.get(tls.current.getServerInstance());
+ List<Path> logs =
logsForDeadServers.get(tm.getLocation().getServerInstance());
if (logs != null) {
for (Path log : logs) {
- LogEntry entry = new LogEntry(tls.extent, 0, log.toString());
+ LogEntry entry = new LogEntry(tm.getExtent(), 0,
log.toString());
tabletMutator.putWal(entry);
}
}
}
}
- if (tls.hasFuture()) {
- tabletMutator.requireLocation(tls.future);
- tabletMutator.deleteLocation(tls.future);
+ if (tm.getLocation().getType().equals(LocationType.FUTURE)) {
Review Comment:
If tm has no location could get an NPE here. The old code seemed to handle
the case of no current or future location. Not sure in general if the code
should handle this case, just noticed the diff in behavior.
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
Review Comment:
Another type later in the code used the word Management. Made me think this
could be called `TabletManagementIterator`.
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagerTabletInfoIterator.class);
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
+ private static final String MANAGER_STATE_OPTION = "managerState";
+ private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
+
+ private static void setCurrentServers(final IteratorSetting cfg,
+ final Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<>();
+ for (TServerInstance server : goodServers) {
+ servers.add(server.getHostPortSession());
+ }
+ cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static void setOnlineTables(final IteratorSetting cfg, final
Set<TableId> onlineTables) {
+ if (onlineTables != null) {
Review Comment:
Since a collection is passed in, it would be better to pass in an empty set
instead of null and remove the null handling from this code. This behavior was
carried forward from the old code, so not something to worry about in this PR
there is already enough changes. It would be a nice follow on improvement as
the null handling cold mask bugs.
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -888,12 +889,12 @@ private long updateStatus() {
} else if (!serversToShutdown.isEmpty()) {
log.debug("not balancing while shutting down servers {}",
serversToShutdown);
} else {
- for (TabletGroupWatcher tgw : watchers) {
- if (!tgw.isSameTserversAsLastScan(currentServers)) {
- log.debug("not balancing just yet, as collection of live tservers
is in flux");
- return DEFAULT_WAIT_FOR_WATCHER;
- }
- }
+ // for (TabletGroupWatcher tgw : watchers) {
Review Comment:
What is going on here?
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagerTabletInfoIterator.class);
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
+ private static final String MANAGER_STATE_OPTION = "managerState";
+ private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
+
+ private static void setCurrentServers(final IteratorSetting cfg,
+ final Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<>();
+ for (TServerInstance server : goodServers) {
+ servers.add(server.getHostPortSession());
+ }
+ cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static void setOnlineTables(final IteratorSetting cfg, final
Set<TableId> onlineTables) {
+ if (onlineTables != null) {
+ cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
+ }
+ }
+
+ private static void setMerges(final IteratorSetting cfg, final
Collection<MergeInfo> merges) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (MergeInfo info : merges) {
+ KeyExtent extent = info.getExtent();
+ if (extent != null && !info.getState().equals(MergeState.NONE)) {
+ info.write(buffer);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MERGES_OPTION, encoded);
+ }
+
+ private static void setMigrations(final IteratorSetting cfg,
+ final Collection<KeyExtent> migrations) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (KeyExtent extent : migrations) {
+ extent.writeTo(buffer);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MIGRATIONS_OPTION, encoded);
+ }
+
+ private static void setManagerState(final IteratorSetting cfg, final
ManagerState state) {
+ cfg.addOption(MANAGER_STATE_OPTION, state.toString());
+ }
+
+ private static void setShuttingDown(final IteratorSetting cfg,
+ final Set<TServerInstance> servers) {
+ if (servers != null) {
+ cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static Set<KeyExtent> parseMigrations(final String migrations) {
+ if (migrations == null) {
+ return Collections.emptySet();
+ }
+ try {
+ Set<KeyExtent> result = new HashSet<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(migrations);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ result.add(KeyExtent.readFrom(buffer));
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static Set<TableId> parseTableIDs(final String tableIDs) {
+ if (tableIDs == null) {
+ return null;
+ }
+ Set<TableId> result = new HashSet<>();
+ for (String tableID : tableIDs.split(",")) {
+ result.add(TableId.of(tableID));
+ }
+ return result;
+ }
+
+ private static Set<TServerInstance> parseServers(final String servers) {
+ if (servers == null) {
+ return null;
+ }
+ // parse "host:port[INSTANCE]"
+ Set<TServerInstance> result = new HashSet<>();
+ if (!servers.isEmpty()) {
+ for (String part : servers.split(",")) {
+ String[] parts = part.split("\\[", 2);
+ String hostport = parts[0];
+ String instance = parts[1];
+ if (instance != null && instance.endsWith("]")) {
+ instance = instance.substring(0, instance.length() - 1);
+ }
+ result.add(new TServerInstance(AddressUtil.parseAddress(hostport,
false), instance));
+ }
+ }
+ return result;
+ }
+
+ private static Map<TableId,MergeInfo> parseMerges(final String merges) {
+ if (merges == null) {
+ return null;
+ }
+ try {
+ Map<TableId,MergeInfo> result = new HashMap<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(merges);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ MergeInfo mergeInfo = new MergeInfo();
+ mergeInfo.readFields(buffer);
+ result.put(mergeInfo.extent.tableId(), mergeInfo);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
+ final long splitThreshold) {
+ return tm.getFilesMap().values().stream().map(DataFileValue::getSize)
+ .collect(Collectors.summarizingLong(Long::longValue)).getSum() >
splitThreshold;
+ }
+
+ private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+ final Set<TableId> onlineTables, final Set<TServerInstance> current,
final boolean debug) {
+ // is the table supposed to be online or offline?
+ final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+
+ if (debug) {
+ LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {},
hostingRequested: {}",
+ tm.getExtent(), getLocationState(current, tm), (shouldBeOnline ?
"on" : "off"),
+ tm.getHostingGoal(), tm.getHostingRequested());
+ }
+ switch (getLocationState(current, tm)) {
+ case ASSIGNED:
+ // we always want data about assigned tablets
+ return true;
+ case HOSTED:
+ if (!shouldBeOnline || tm.getHostingGoal() == TabletHostingGoal.NEVER
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
!tm.getHostingRequested())) {
+ return true;
+ }
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ return true;
+ case SUSPENDED:
+ case UNASSIGNED:
+ if (shouldBeOnline && (tm.getHostingGoal() == TabletHostingGoal.ALWAYS
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
tm.getHostingRequested()))) {
+ return true;
+ }
+ break;
+ default:
+ throw new AssertionError(
+ "Inconceivable! The tablet is an unrecognized state: " +
getLocationState(current, tm));
+ }
+ return false;
+ }
+
+ private static TabletState getLocationState(final Set<TServerInstance>
liveServers,
+ final TabletMetadata tm) {
+ Location loc = tm.getLocation();
+
+ if (loc == null || loc.getType() == null || loc.getServerInstance() ==
null) {
+ return TabletState.UNASSIGNED;
+ }
+
+ if (loc.getType().equals(LocationType.FUTURE)) {
+ return liveServers.contains(loc.getServerInstance()) ?
TabletState.ASSIGNED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (loc.getType().equals(LocationType.CURRENT)) {
+ return liveServers.contains(loc.getServerInstance()) ? TabletState.HOSTED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (tm.getSuspend() != null) {
+ return TabletState.SUSPENDED;
+ } else {
+ return TabletState.UNASSIGNED;
+ }
+ }
+
+ public static void configureScanner(final ScannerBase scanner, final
CurrentState state) {
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LastLocationColumnFamily.NAME);
+
scanner.fetchColumnFamily(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily());
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.fetchColumnFamily(HostingColumnFamily.NAME);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows",
WholeRowIterator.class));
+ IteratorSetting tabletChange =
+ new IteratorSetting(1001, "ManagerTabletInfoIterator",
ManagerTabletInfoIterator.class);
+ if (state != null) {
+ ManagerTabletInfoIterator.setCurrentServers(tabletChange,
state.onlineTabletServers());
+ ManagerTabletInfoIterator.setOnlineTables(tabletChange,
state.onlineTables());
+ ManagerTabletInfoIterator.setMerges(tabletChange, state.merges());
+ ManagerTabletInfoIterator.setMigrations(tabletChange,
state.migrationsSnapshot());
+ ManagerTabletInfoIterator.setManagerState(tabletChange,
state.getManagerState());
+ ManagerTabletInfoIterator.setShuttingDown(tabletChange,
state.shutdownServers());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
+
+ public static ManagerTabletInfo decode(Entry<Key,Value> e) throws
IOException {
+ return new ManagerTabletInfo(e.getKey(), e.getValue());
+ }
+
+ private Set<TServerInstance> current;
+ private Set<TableId> onlineTables;
+ private Map<TableId,MergeInfo> merges;
+ private boolean debug = false;
+ private Set<KeyExtent> migrations;
+ private ManagerState managerState = ManagerState.NORMAL;
+ private IteratorEnvironment env;
+ private Key topKey = null;
+ private Value topValue = null;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.env = env;
+ current = parseServers(options.get(SERVERS_OPTION));
+ onlineTables = parseTableIDs(options.get(TABLES_OPTION));
+ merges = parseMerges(options.get(MERGES_OPTION));
+ debug = options.containsKey(DEBUG_OPTION);
+ migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+ try {
+ managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION));
+ } catch (Exception ex) {
+ if (options.get(MANAGER_STATE_OPTION) != null) {
+ LOG.error("Unable to decode managerState {}",
options.get(MANAGER_STATE_OPTION));
+ }
+ }
+ Set<TServerInstance> shuttingDown =
parseServers(options.get(SHUTTING_DOWN_OPTION));
+ if (current != null && shuttingDown != null) {
+ current.removeAll(shuttingDown);
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null && topValue != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ super.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
+ throws IOException {
+ topKey = null;
+ topValue = null;
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
+ protected void consume() throws IOException {
+
+ final Set<ManagementAction> reasonsToReturnThisTablet = new HashSet<>();
+ while (getSource().hasTop()) {
+ final Key k = getSource().getTopKey();
+ final Value v = getSource().getTopValue();
+ final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
+ final TabletMetadata tm =
TabletMetadata.convertRow(decodedRow.entrySet().iterator(),
+ ManagerTabletInfo.CONFIGURED_COLUMNS, false, true);
+
+ LOG.debug("Evaluating extent: {}", tm);
+ if (sendTabletToManager(tm, reasonsToReturnThisTablet)) {
+ // If we simply returned here, then the client would get the encoded
K,V
+ // from the WholeRowIterator. However, it would not know the reason(s)
why
+ // it was returned. Insert a K,V pair to represent the reasons. The
client
+ // can pull this K,V pair from the results by looking at the colf.
+ ManagerTabletInfo.addActions(decodedRow, reasonsToReturnThisTablet);
+ topKey = decodedRow.firstKey();
+ topValue = WholeRowIterator.encodeRow(new
ArrayList<>(decodedRow.keySet()),
+ new ArrayList<>(decodedRow.values()));
+ LOG.debug("Returning extent with reasons: {}",
reasonsToReturnThisTablet);
+ return;
+ }
+
+ LOG.debug("No reason to return this extent, continuing");
+ getSource().next();
+ }
+ }
+
+ /**
+ * Evaluates whether or not this Tablet should be returned so that it can be
acted upon by the
+ * Manager
+ */
+ private boolean sendTabletToManager(final TabletMetadata tm,
+ final Set<ManagementAction> reasonsToReturnThisTablet) {
+
+ reasonsToReturnThisTablet.clear();
+
+ if (onlineTables == null || current == null || managerState !=
ManagerState.NORMAL
Review Comment:
I do not understand why the following conditions result in bad state. Can
you explain?
`onlineTables == null || current == null || managerState !=
ManagerState.NORMAL`
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagerTabletInfoIterator.class);
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
+ private static final String MANAGER_STATE_OPTION = "managerState";
+ private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
+
+ private static void setCurrentServers(final IteratorSetting cfg,
+ final Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<>();
+ for (TServerInstance server : goodServers) {
+ servers.add(server.getHostPortSession());
+ }
+ cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static void setOnlineTables(final IteratorSetting cfg, final
Set<TableId> onlineTables) {
+ if (onlineTables != null) {
+ cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
+ }
+ }
+
+ private static void setMerges(final IteratorSetting cfg, final
Collection<MergeInfo> merges) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (MergeInfo info : merges) {
+ KeyExtent extent = info.getExtent();
+ if (extent != null && !info.getState().equals(MergeState.NONE)) {
+ info.write(buffer);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MERGES_OPTION, encoded);
+ }
+
+ private static void setMigrations(final IteratorSetting cfg,
+ final Collection<KeyExtent> migrations) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (KeyExtent extent : migrations) {
+ extent.writeTo(buffer);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MIGRATIONS_OPTION, encoded);
+ }
+
+ private static void setManagerState(final IteratorSetting cfg, final
ManagerState state) {
+ cfg.addOption(MANAGER_STATE_OPTION, state.toString());
+ }
+
+ private static void setShuttingDown(final IteratorSetting cfg,
+ final Set<TServerInstance> servers) {
+ if (servers != null) {
+ cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static Set<KeyExtent> parseMigrations(final String migrations) {
+ if (migrations == null) {
+ return Collections.emptySet();
+ }
+ try {
+ Set<KeyExtent> result = new HashSet<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(migrations);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ result.add(KeyExtent.readFrom(buffer));
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static Set<TableId> parseTableIDs(final String tableIDs) {
+ if (tableIDs == null) {
+ return null;
+ }
+ Set<TableId> result = new HashSet<>();
+ for (String tableID : tableIDs.split(",")) {
+ result.add(TableId.of(tableID));
+ }
+ return result;
+ }
+
+ private static Set<TServerInstance> parseServers(final String servers) {
+ if (servers == null) {
+ return null;
+ }
+ // parse "host:port[INSTANCE]"
+ Set<TServerInstance> result = new HashSet<>();
+ if (!servers.isEmpty()) {
+ for (String part : servers.split(",")) {
+ String[] parts = part.split("\\[", 2);
+ String hostport = parts[0];
+ String instance = parts[1];
+ if (instance != null && instance.endsWith("]")) {
+ instance = instance.substring(0, instance.length() - 1);
+ }
+ result.add(new TServerInstance(AddressUtil.parseAddress(hostport,
false), instance));
+ }
+ }
+ return result;
+ }
+
+ private static Map<TableId,MergeInfo> parseMerges(final String merges) {
+ if (merges == null) {
+ return null;
+ }
+ try {
+ Map<TableId,MergeInfo> result = new HashMap<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(merges);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ MergeInfo mergeInfo = new MergeInfo();
+ mergeInfo.readFields(buffer);
+ result.put(mergeInfo.extent.tableId(), mergeInfo);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
+ final long splitThreshold) {
+ return tm.getFilesMap().values().stream().map(DataFileValue::getSize)
+ .collect(Collectors.summarizingLong(Long::longValue)).getSum() >
splitThreshold;
+ }
+
+ private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+ final Set<TableId> onlineTables, final Set<TServerInstance> current,
final boolean debug) {
+ // is the table supposed to be online or offline?
+ final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+
+ if (debug) {
+ LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {},
hostingRequested: {}",
+ tm.getExtent(), getLocationState(current, tm), (shouldBeOnline ?
"on" : "off"),
+ tm.getHostingGoal(), tm.getHostingRequested());
+ }
+ switch (getLocationState(current, tm)) {
+ case ASSIGNED:
+ // we always want data about assigned tablets
+ return true;
+ case HOSTED:
+ if (!shouldBeOnline || tm.getHostingGoal() == TabletHostingGoal.NEVER
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
!tm.getHostingRequested())) {
+ return true;
+ }
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ return true;
+ case SUSPENDED:
+ case UNASSIGNED:
+ if (shouldBeOnline && (tm.getHostingGoal() == TabletHostingGoal.ALWAYS
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
tm.getHostingRequested()))) {
+ return true;
+ }
+ break;
+ default:
+ throw new AssertionError(
+ "Inconceivable! The tablet is an unrecognized state: " +
getLocationState(current, tm));
+ }
+ return false;
+ }
+
+ private static TabletState getLocationState(final Set<TServerInstance>
liveServers,
+ final TabletMetadata tm) {
+ Location loc = tm.getLocation();
+
+ if (loc == null || loc.getType() == null || loc.getServerInstance() ==
null) {
+ return TabletState.UNASSIGNED;
+ }
+
+ if (loc.getType().equals(LocationType.FUTURE)) {
+ return liveServers.contains(loc.getServerInstance()) ?
TabletState.ASSIGNED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (loc.getType().equals(LocationType.CURRENT)) {
+ return liveServers.contains(loc.getServerInstance()) ? TabletState.HOSTED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (tm.getSuspend() != null) {
+ return TabletState.SUSPENDED;
+ } else {
+ return TabletState.UNASSIGNED;
+ }
+ }
+
+ public static void configureScanner(final ScannerBase scanner, final
CurrentState state) {
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LastLocationColumnFamily.NAME);
+
scanner.fetchColumnFamily(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily());
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.fetchColumnFamily(HostingColumnFamily.NAME);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows",
WholeRowIterator.class));
+ IteratorSetting tabletChange =
+ new IteratorSetting(1001, "ManagerTabletInfoIterator",
ManagerTabletInfoIterator.class);
+ if (state != null) {
+ ManagerTabletInfoIterator.setCurrentServers(tabletChange,
state.onlineTabletServers());
+ ManagerTabletInfoIterator.setOnlineTables(tabletChange,
state.onlineTables());
+ ManagerTabletInfoIterator.setMerges(tabletChange, state.merges());
+ ManagerTabletInfoIterator.setMigrations(tabletChange,
state.migrationsSnapshot());
+ ManagerTabletInfoIterator.setManagerState(tabletChange,
state.getManagerState());
+ ManagerTabletInfoIterator.setShuttingDown(tabletChange,
state.shutdownServers());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
+
+ public static ManagerTabletInfo decode(Entry<Key,Value> e) throws
IOException {
+ return new ManagerTabletInfo(e.getKey(), e.getValue());
+ }
+
+ private Set<TServerInstance> current;
+ private Set<TableId> onlineTables;
+ private Map<TableId,MergeInfo> merges;
+ private boolean debug = false;
+ private Set<KeyExtent> migrations;
+ private ManagerState managerState = ManagerState.NORMAL;
+ private IteratorEnvironment env;
+ private Key topKey = null;
+ private Value topValue = null;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.env = env;
+ current = parseServers(options.get(SERVERS_OPTION));
+ onlineTables = parseTableIDs(options.get(TABLES_OPTION));
+ merges = parseMerges(options.get(MERGES_OPTION));
+ debug = options.containsKey(DEBUG_OPTION);
+ migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+ try {
+ managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION));
+ } catch (Exception ex) {
+ if (options.get(MANAGER_STATE_OPTION) != null) {
+ LOG.error("Unable to decode managerState {}",
options.get(MANAGER_STATE_OPTION));
+ }
+ }
+ Set<TServerInstance> shuttingDown =
parseServers(options.get(SHUTTING_DOWN_OPTION));
+ if (current != null && shuttingDown != null) {
+ current.removeAll(shuttingDown);
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null && topValue != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ super.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
+ throws IOException {
+ topKey = null;
+ topValue = null;
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
+ protected void consume() throws IOException {
+
+ final Set<ManagementAction> reasonsToReturnThisTablet = new HashSet<>();
+ while (getSource().hasTop()) {
+ final Key k = getSource().getTopKey();
+ final Value v = getSource().getTopValue();
+ final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
+ final TabletMetadata tm =
TabletMetadata.convertRow(decodedRow.entrySet().iterator(),
+ ManagerTabletInfo.CONFIGURED_COLUMNS, false, true);
+
+ LOG.debug("Evaluating extent: {}", tm);
+ if (sendTabletToManager(tm, reasonsToReturnThisTablet)) {
+ // If we simply returned here, then the client would get the encoded
K,V
+ // from the WholeRowIterator. However, it would not know the reason(s)
why
+ // it was returned. Insert a K,V pair to represent the reasons. The
client
+ // can pull this K,V pair from the results by looking at the colf.
+ ManagerTabletInfo.addActions(decodedRow, reasonsToReturnThisTablet);
+ topKey = decodedRow.firstKey();
+ topValue = WholeRowIterator.encodeRow(new
ArrayList<>(decodedRow.keySet()),
+ new ArrayList<>(decodedRow.values()));
+ LOG.debug("Returning extent with reasons: {}",
reasonsToReturnThisTablet);
+ return;
+ }
+
+ LOG.debug("No reason to return this extent, continuing");
+ getSource().next();
+ }
+ }
+
+ /**
+ * Evaluates whether or not this Tablet should be returned so that it can be
acted upon by the
+ * Manager
+ */
+ private boolean sendTabletToManager(final TabletMetadata tm,
+ final Set<ManagementAction> reasonsToReturnThisTablet) {
+
+ reasonsToReturnThisTablet.clear();
+
+ if (onlineTables == null || current == null || managerState !=
ManagerState.NORMAL
+ || tm.isFutureAndCurrentLocationSet()) {
+ // no need to check everything, we are in a known state where we want to
return everything.
+ reasonsToReturnThisTablet.add(ManagementAction.BAD_STATE);
+ return true;
+ }
+
+ // we always want data about merges
+ final MergeInfo merge = merges.get(tm.getTableId());
+ if (merge != null) {
+ // could make this smarter by only returning if the tablet is involved
in the merge
+ reasonsToReturnThisTablet.add(ManagementAction.IS_MERGING);
+ }
+
+ // always return the information for migrating tablets
+ if (migrations.contains(tm.getExtent())) {
+ reasonsToReturnThisTablet.add(ManagementAction.IS_MIGRATING);
+ }
+
+ if (shouldReturnDueToLocation(tm, onlineTables, current, debug)) {
+ reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE);
+ }
+
+ final long splitThreshold =
+ ConfigurationTypeHelper.getFixedMemoryAsBytes(this.env.getPluginEnv()
+
.getConfiguration(tm.getTableId()).get(Property.TABLE_SPLIT_THRESHOLD.getKey()));
+ if (shouldReturnDueToSplit(tm, splitThreshold)) {
+ reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING);
+ }
+
+ // TODO: Add compaction logic
+
+ if (!reasonsToReturnThisTablet.isEmpty()) {
+ return true;
Review Comment:
Not sure how it would turn out, but wondering if it would be a bit cleaner
if the calling code just checked if the set was empty or not instead of this
method returning a boolean. I think that would clean up this method, but may
make the calling code a bit less concise.
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagerTabletInfoIterator.class);
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
+ private static final String MANAGER_STATE_OPTION = "managerState";
+ private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
+
+ private static void setCurrentServers(final IteratorSetting cfg,
+ final Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<>();
+ for (TServerInstance server : goodServers) {
+ servers.add(server.getHostPortSession());
+ }
+ cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static void setOnlineTables(final IteratorSetting cfg, final
Set<TableId> onlineTables) {
+ if (onlineTables != null) {
+ cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
+ }
+ }
+
+ private static void setMerges(final IteratorSetting cfg, final
Collection<MergeInfo> merges) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (MergeInfo info : merges) {
+ KeyExtent extent = info.getExtent();
+ if (extent != null && !info.getState().equals(MergeState.NONE)) {
+ info.write(buffer);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MERGES_OPTION, encoded);
+ }
+
+ private static void setMigrations(final IteratorSetting cfg,
+ final Collection<KeyExtent> migrations) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (KeyExtent extent : migrations) {
+ extent.writeTo(buffer);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MIGRATIONS_OPTION, encoded);
+ }
+
+ private static void setManagerState(final IteratorSetting cfg, final
ManagerState state) {
+ cfg.addOption(MANAGER_STATE_OPTION, state.toString());
+ }
+
+ private static void setShuttingDown(final IteratorSetting cfg,
+ final Set<TServerInstance> servers) {
+ if (servers != null) {
+ cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static Set<KeyExtent> parseMigrations(final String migrations) {
+ if (migrations == null) {
+ return Collections.emptySet();
+ }
+ try {
+ Set<KeyExtent> result = new HashSet<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(migrations);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ result.add(KeyExtent.readFrom(buffer));
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static Set<TableId> parseTableIDs(final String tableIDs) {
+ if (tableIDs == null) {
+ return null;
+ }
+ Set<TableId> result = new HashSet<>();
+ for (String tableID : tableIDs.split(",")) {
+ result.add(TableId.of(tableID));
+ }
+ return result;
+ }
+
+ private static Set<TServerInstance> parseServers(final String servers) {
+ if (servers == null) {
+ return null;
+ }
+ // parse "host:port[INSTANCE]"
+ Set<TServerInstance> result = new HashSet<>();
+ if (!servers.isEmpty()) {
+ for (String part : servers.split(",")) {
+ String[] parts = part.split("\\[", 2);
+ String hostport = parts[0];
+ String instance = parts[1];
+ if (instance != null && instance.endsWith("]")) {
+ instance = instance.substring(0, instance.length() - 1);
+ }
+ result.add(new TServerInstance(AddressUtil.parseAddress(hostport,
false), instance));
+ }
+ }
+ return result;
+ }
+
+ private static Map<TableId,MergeInfo> parseMerges(final String merges) {
+ if (merges == null) {
+ return null;
+ }
+ try {
+ Map<TableId,MergeInfo> result = new HashMap<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(merges);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ MergeInfo mergeInfo = new MergeInfo();
+ mergeInfo.readFields(buffer);
+ result.put(mergeInfo.extent.tableId(), mergeInfo);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
+ final long splitThreshold) {
+ return tm.getFilesMap().values().stream().map(DataFileValue::getSize)
+ .collect(Collectors.summarizingLong(Long::longValue)).getSum() >
splitThreshold;
+ }
+
+ private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+ final Set<TableId> onlineTables, final Set<TServerInstance> current,
final boolean debug) {
+ // is the table supposed to be online or offline?
+ final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+
+ if (debug) {
+ LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {},
hostingRequested: {}",
+ tm.getExtent(), getLocationState(current, tm), (shouldBeOnline ?
"on" : "off"),
+ tm.getHostingGoal(), tm.getHostingRequested());
+ }
+ switch (getLocationState(current, tm)) {
+ case ASSIGNED:
+ // we always want data about assigned tablets
+ return true;
+ case HOSTED:
+ if (!shouldBeOnline || tm.getHostingGoal() == TabletHostingGoal.NEVER
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
!tm.getHostingRequested())) {
+ return true;
+ }
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ return true;
+ case SUSPENDED:
+ case UNASSIGNED:
+ if (shouldBeOnline && (tm.getHostingGoal() == TabletHostingGoal.ALWAYS
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
tm.getHostingRequested()))) {
+ return true;
+ }
+ break;
+ default:
+ throw new AssertionError(
+ "Inconceivable! The tablet is an unrecognized state: " +
getLocationState(current, tm));
+ }
+ return false;
+ }
+
+ private static TabletState getLocationState(final Set<TServerInstance>
liveServers,
+ final TabletMetadata tm) {
+ Location loc = tm.getLocation();
+
+ if (loc == null || loc.getType() == null || loc.getServerInstance() ==
null) {
+ return TabletState.UNASSIGNED;
+ }
+
+ if (loc.getType().equals(LocationType.FUTURE)) {
+ return liveServers.contains(loc.getServerInstance()) ?
TabletState.ASSIGNED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (loc.getType().equals(LocationType.CURRENT)) {
+ return liveServers.contains(loc.getServerInstance()) ? TabletState.HOSTED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (tm.getSuspend() != null) {
+ return TabletState.SUSPENDED;
+ } else {
+ return TabletState.UNASSIGNED;
+ }
+ }
+
+ public static void configureScanner(final ScannerBase scanner, final
CurrentState state) {
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LastLocationColumnFamily.NAME);
+
scanner.fetchColumnFamily(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily());
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.fetchColumnFamily(HostingColumnFamily.NAME);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows",
WholeRowIterator.class));
+ IteratorSetting tabletChange =
+ new IteratorSetting(1001, "ManagerTabletInfoIterator",
ManagerTabletInfoIterator.class);
+ if (state != null) {
+ ManagerTabletInfoIterator.setCurrentServers(tabletChange,
state.onlineTabletServers());
+ ManagerTabletInfoIterator.setOnlineTables(tabletChange,
state.onlineTables());
+ ManagerTabletInfoIterator.setMerges(tabletChange, state.merges());
+ ManagerTabletInfoIterator.setMigrations(tabletChange,
state.migrationsSnapshot());
+ ManagerTabletInfoIterator.setManagerState(tabletChange,
state.getManagerState());
+ ManagerTabletInfoIterator.setShuttingDown(tabletChange,
state.shutdownServers());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
+
+ public static ManagerTabletInfo decode(Entry<Key,Value> e) throws
IOException {
+ return new ManagerTabletInfo(e.getKey(), e.getValue());
+ }
+
+ private Set<TServerInstance> current;
+ private Set<TableId> onlineTables;
+ private Map<TableId,MergeInfo> merges;
+ private boolean debug = false;
+ private Set<KeyExtent> migrations;
+ private ManagerState managerState = ManagerState.NORMAL;
+ private IteratorEnvironment env;
+ private Key topKey = null;
+ private Value topValue = null;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.env = env;
+ current = parseServers(options.get(SERVERS_OPTION));
+ onlineTables = parseTableIDs(options.get(TABLES_OPTION));
+ merges = parseMerges(options.get(MERGES_OPTION));
+ debug = options.containsKey(DEBUG_OPTION);
+ migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+ try {
+ managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION));
+ } catch (Exception ex) {
+ if (options.get(MANAGER_STATE_OPTION) != null) {
+ LOG.error("Unable to decode managerState {}",
options.get(MANAGER_STATE_OPTION));
+ }
+ }
+ Set<TServerInstance> shuttingDown =
parseServers(options.get(SHUTTING_DOWN_OPTION));
+ if (current != null && shuttingDown != null) {
+ current.removeAll(shuttingDown);
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null && topValue != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ super.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
+ throws IOException {
+ topKey = null;
+ topValue = null;
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
+ protected void consume() throws IOException {
+
+ final Set<ManagementAction> reasonsToReturnThisTablet = new HashSet<>();
+ while (getSource().hasTop()) {
+ final Key k = getSource().getTopKey();
+ final Value v = getSource().getTopValue();
+ final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
+ final TabletMetadata tm =
TabletMetadata.convertRow(decodedRow.entrySet().iterator(),
+ ManagerTabletInfo.CONFIGURED_COLUMNS, false, true);
+
+ LOG.debug("Evaluating extent: {}", tm);
+ if (sendTabletToManager(tm, reasonsToReturnThisTablet)) {
+ // If we simply returned here, then the client would get the encoded
K,V
+ // from the WholeRowIterator. However, it would not know the reason(s)
why
+ // it was returned. Insert a K,V pair to represent the reasons. The
client
+ // can pull this K,V pair from the results by looking at the colf.
+ ManagerTabletInfo.addActions(decodedRow, reasonsToReturnThisTablet);
+ topKey = decodedRow.firstKey();
+ topValue = WholeRowIterator.encodeRow(new
ArrayList<>(decodedRow.keySet()),
+ new ArrayList<>(decodedRow.values()));
+ LOG.debug("Returning extent with reasons: {}",
reasonsToReturnThisTablet);
Review Comment:
```suggestion
LOG.trace("Returning extent {} with reasons: {}", tm.getExtent(),
reasonsToReturnThisTablet);
```
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagerTabletInfoIterator.class);
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
+ private static final String MANAGER_STATE_OPTION = "managerState";
+ private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
+
+ private static void setCurrentServers(final IteratorSetting cfg,
+ final Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<>();
+ for (TServerInstance server : goodServers) {
+ servers.add(server.getHostPortSession());
+ }
+ cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static void setOnlineTables(final IteratorSetting cfg, final
Set<TableId> onlineTables) {
+ if (onlineTables != null) {
+ cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
+ }
+ }
+
+ private static void setMerges(final IteratorSetting cfg, final
Collection<MergeInfo> merges) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (MergeInfo info : merges) {
+ KeyExtent extent = info.getExtent();
+ if (extent != null && !info.getState().equals(MergeState.NONE)) {
+ info.write(buffer);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MERGES_OPTION, encoded);
+ }
+
+ private static void setMigrations(final IteratorSetting cfg,
+ final Collection<KeyExtent> migrations) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (KeyExtent extent : migrations) {
+ extent.writeTo(buffer);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MIGRATIONS_OPTION, encoded);
+ }
+
+ private static void setManagerState(final IteratorSetting cfg, final
ManagerState state) {
+ cfg.addOption(MANAGER_STATE_OPTION, state.toString());
+ }
+
+ private static void setShuttingDown(final IteratorSetting cfg,
+ final Set<TServerInstance> servers) {
+ if (servers != null) {
+ cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static Set<KeyExtent> parseMigrations(final String migrations) {
+ if (migrations == null) {
+ return Collections.emptySet();
+ }
+ try {
+ Set<KeyExtent> result = new HashSet<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(migrations);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ result.add(KeyExtent.readFrom(buffer));
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static Set<TableId> parseTableIDs(final String tableIDs) {
+ if (tableIDs == null) {
+ return null;
+ }
+ Set<TableId> result = new HashSet<>();
+ for (String tableID : tableIDs.split(",")) {
+ result.add(TableId.of(tableID));
+ }
+ return result;
+ }
+
+ private static Set<TServerInstance> parseServers(final String servers) {
+ if (servers == null) {
+ return null;
+ }
+ // parse "host:port[INSTANCE]"
+ Set<TServerInstance> result = new HashSet<>();
+ if (!servers.isEmpty()) {
+ for (String part : servers.split(",")) {
+ String[] parts = part.split("\\[", 2);
+ String hostport = parts[0];
+ String instance = parts[1];
+ if (instance != null && instance.endsWith("]")) {
+ instance = instance.substring(0, instance.length() - 1);
+ }
+ result.add(new TServerInstance(AddressUtil.parseAddress(hostport,
false), instance));
+ }
+ }
+ return result;
+ }
+
+ private static Map<TableId,MergeInfo> parseMerges(final String merges) {
+ if (merges == null) {
+ return null;
+ }
+ try {
+ Map<TableId,MergeInfo> result = new HashMap<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(merges);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ MergeInfo mergeInfo = new MergeInfo();
+ mergeInfo.readFields(buffer);
+ result.put(mergeInfo.extent.tableId(), mergeInfo);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
+ final long splitThreshold) {
+ return tm.getFilesMap().values().stream().map(DataFileValue::getSize)
+ .collect(Collectors.summarizingLong(Long::longValue)).getSum() >
splitThreshold;
+ }
+
+ private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+ final Set<TableId> onlineTables, final Set<TServerInstance> current,
final boolean debug) {
+ // is the table supposed to be online or offline?
+ final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+
+ if (debug) {
+ LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {},
hostingRequested: {}",
+ tm.getExtent(), getLocationState(current, tm), (shouldBeOnline ?
"on" : "off"),
+ tm.getHostingGoal(), tm.getHostingRequested());
+ }
+ switch (getLocationState(current, tm)) {
+ case ASSIGNED:
+ // we always want data about assigned tablets
+ return true;
+ case HOSTED:
+ if (!shouldBeOnline || tm.getHostingGoal() == TabletHostingGoal.NEVER
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
!tm.getHostingRequested())) {
+ return true;
+ }
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ return true;
+ case SUSPENDED:
+ case UNASSIGNED:
+ if (shouldBeOnline && (tm.getHostingGoal() == TabletHostingGoal.ALWAYS
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
tm.getHostingRequested()))) {
+ return true;
+ }
+ break;
+ default:
+ throw new AssertionError(
+ "Inconceivable! The tablet is an unrecognized state: " +
getLocationState(current, tm));
+ }
+ return false;
+ }
+
+ private static TabletState getLocationState(final Set<TServerInstance>
liveServers,
+ final TabletMetadata tm) {
+ Location loc = tm.getLocation();
+
+ if (loc == null || loc.getType() == null || loc.getServerInstance() ==
null) {
+ return TabletState.UNASSIGNED;
+ }
+
+ if (loc.getType().equals(LocationType.FUTURE)) {
+ return liveServers.contains(loc.getServerInstance()) ?
TabletState.ASSIGNED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (loc.getType().equals(LocationType.CURRENT)) {
+ return liveServers.contains(loc.getServerInstance()) ? TabletState.HOSTED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (tm.getSuspend() != null) {
+ return TabletState.SUSPENDED;
+ } else {
+ return TabletState.UNASSIGNED;
+ }
+ }
+
+ public static void configureScanner(final ScannerBase scanner, final
CurrentState state) {
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LastLocationColumnFamily.NAME);
+
scanner.fetchColumnFamily(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily());
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.fetchColumnFamily(HostingColumnFamily.NAME);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows",
WholeRowIterator.class));
+ IteratorSetting tabletChange =
+ new IteratorSetting(1001, "ManagerTabletInfoIterator",
ManagerTabletInfoIterator.class);
+ if (state != null) {
+ ManagerTabletInfoIterator.setCurrentServers(tabletChange,
state.onlineTabletServers());
+ ManagerTabletInfoIterator.setOnlineTables(tabletChange,
state.onlineTables());
+ ManagerTabletInfoIterator.setMerges(tabletChange, state.merges());
+ ManagerTabletInfoIterator.setMigrations(tabletChange,
state.migrationsSnapshot());
+ ManagerTabletInfoIterator.setManagerState(tabletChange,
state.getManagerState());
+ ManagerTabletInfoIterator.setShuttingDown(tabletChange,
state.shutdownServers());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
+
+ public static ManagerTabletInfo decode(Entry<Key,Value> e) throws
IOException {
+ return new ManagerTabletInfo(e.getKey(), e.getValue());
+ }
+
+ private Set<TServerInstance> current;
+ private Set<TableId> onlineTables;
+ private Map<TableId,MergeInfo> merges;
+ private boolean debug = false;
+ private Set<KeyExtent> migrations;
+ private ManagerState managerState = ManagerState.NORMAL;
+ private IteratorEnvironment env;
+ private Key topKey = null;
+ private Value topValue = null;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.env = env;
+ current = parseServers(options.get(SERVERS_OPTION));
+ onlineTables = parseTableIDs(options.get(TABLES_OPTION));
+ merges = parseMerges(options.get(MERGES_OPTION));
+ debug = options.containsKey(DEBUG_OPTION);
+ migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+ try {
+ managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION));
+ } catch (Exception ex) {
+ if (options.get(MANAGER_STATE_OPTION) != null) {
+ LOG.error("Unable to decode managerState {}",
options.get(MANAGER_STATE_OPTION));
+ }
+ }
+ Set<TServerInstance> shuttingDown =
parseServers(options.get(SHUTTING_DOWN_OPTION));
+ if (current != null && shuttingDown != null) {
+ current.removeAll(shuttingDown);
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null && topValue != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ super.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
+ throws IOException {
+ topKey = null;
+ topValue = null;
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
+ protected void consume() throws IOException {
+
Review Comment:
If the topKey and topValue are set to null in consume, may be able to drop
the next() and seek() overrides.
```suggestion
@Override
protected void consume() throws IOException {
topKey = null;
topValue = null;
```
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java:
##########
@@ -134,84 +87,19 @@ public boolean hasNext() {
}
@Override
- public TabletLocationState next() {
+ public ManagerTabletInfo next() {
if (closed.get()) {
throw new NoSuchElementException(this.getClass().getSimpleName() + " is
closed");
}
+ Entry<Key,Value> e = iter.next();
try {
- Entry<Key,Value> e = iter.next();
- return createTabletLocationState(e.getKey(), e.getValue());
- } catch (IOException | BadLocationStateException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- public static TabletLocationState createTabletLocationState(Key k, Value v)
- throws IOException, BadLocationStateException {
- final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
- KeyExtent extent = null;
- Location future = null;
- Location current = null;
- Location last = null;
- SuspendingTServer suspend = null;
- long lastTimestamp = 0;
- List<Collection<String>> walogs = new ArrayList<>();
- boolean chopped = false;
- TabletHostingGoal goal = TabletHostingGoal.ONDEMAND;
- boolean onDemandHostingRequested = false;
-
- for (Entry<Key,Value> entry : decodedRow.entrySet()) {
-
- Key key = entry.getKey();
- Text row = key.getRow();
- Text cf = key.getColumnFamily();
- Text cq = key.getColumnQualifier();
-
- if (cf.compareTo(FutureLocationColumnFamily.NAME) == 0) {
- Location location = Location.future(new
TServerInstance(entry.getValue(), cq));
- if (future != null) {
- throw new BadLocationStateException("found two assignments for the
same extent " + row
- + ": " + future + " and " + location, row);
- }
- future = location;
- } else if (cf.compareTo(CurrentLocationColumnFamily.NAME) == 0) {
- Location location = Location.current(new
TServerInstance(entry.getValue(), cq));
- if (current != null) {
- throw new BadLocationStateException("found two locations for the
same extent " + row
- + ": " + current + " and " + location, row);
- }
- current = location;
- } else if (cf.compareTo(LogColumnFamily.NAME) == 0) {
- String[] split =
entry.getValue().toString().split("\\|")[0].split(";");
- walogs.add(Arrays.asList(split));
- } else if (cf.compareTo(LastLocationColumnFamily.NAME) == 0) {
- if (lastTimestamp < entry.getKey().getTimestamp()) {
- last = Location.last(new TServerInstance(entry.getValue(), cq));
- lastTimestamp = entry.getKey().getTimestamp();
- }
- } else if (cf.compareTo(ChoppedColumnFamily.NAME) == 0) {
- chopped = true;
- } else if (TabletColumnFamily.PREV_ROW_COLUMN.equals(cf, cq)) {
- extent = KeyExtent.fromMetaPrevRow(entry);
- } else if (SuspendLocationColumn.SUSPEND_COLUMN.equals(cf, cq)) {
- suspend = SuspendingTServer.fromValue(entry.getValue());
- } else if (HostingColumnFamily.GOAL_COLUMN.equals(cf, cq)) {
- goal = TabletHostingGoalUtil.fromValue(entry.getValue());
- } else if (HostingColumnFamily.REQUESTED_COLUMN.equals(cf, cq)) {
- onDemandHostingRequested = true;
- }
- }
- if (extent == null) {
- String msg = "No prev-row for key extent " + decodedRow;
- log.error(msg);
- throw new BadLocationStateException(msg, k.getRow());
- }
- // Override the goal for root and metadata table, should be always
- if (extent.isMeta()) {
- goal = TabletHostingGoal.ALWAYS;
+ ManagerTabletInfo tmi = ManagerTabletInfoIterator.decode(e);
+ log.debug("Returning metadata tablet, extent: {}, hostingGoal: {}",
Review Comment:
```suggestion
log.trace("Returning metadata tablet, extent: {}, hostingGoal: {}",
```
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagerTabletInfoIterator.class);
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
+ private static final String MANAGER_STATE_OPTION = "managerState";
+ private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
+
+ private static void setCurrentServers(final IteratorSetting cfg,
+ final Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<>();
+ for (TServerInstance server : goodServers) {
+ servers.add(server.getHostPortSession());
+ }
+ cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static void setOnlineTables(final IteratorSetting cfg, final
Set<TableId> onlineTables) {
+ if (onlineTables != null) {
+ cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
+ }
+ }
+
+ private static void setMerges(final IteratorSetting cfg, final
Collection<MergeInfo> merges) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (MergeInfo info : merges) {
+ KeyExtent extent = info.getExtent();
+ if (extent != null && !info.getState().equals(MergeState.NONE)) {
+ info.write(buffer);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MERGES_OPTION, encoded);
+ }
+
+ private static void setMigrations(final IteratorSetting cfg,
+ final Collection<KeyExtent> migrations) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (KeyExtent extent : migrations) {
+ extent.writeTo(buffer);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MIGRATIONS_OPTION, encoded);
+ }
+
+ private static void setManagerState(final IteratorSetting cfg, final
ManagerState state) {
+ cfg.addOption(MANAGER_STATE_OPTION, state.toString());
+ }
+
+ private static void setShuttingDown(final IteratorSetting cfg,
+ final Set<TServerInstance> servers) {
+ if (servers != null) {
+ cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static Set<KeyExtent> parseMigrations(final String migrations) {
+ if (migrations == null) {
+ return Collections.emptySet();
+ }
+ try {
+ Set<KeyExtent> result = new HashSet<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(migrations);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ result.add(KeyExtent.readFrom(buffer));
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static Set<TableId> parseTableIDs(final String tableIDs) {
+ if (tableIDs == null) {
+ return null;
+ }
+ Set<TableId> result = new HashSet<>();
+ for (String tableID : tableIDs.split(",")) {
+ result.add(TableId.of(tableID));
+ }
+ return result;
+ }
+
+ private static Set<TServerInstance> parseServers(final String servers) {
+ if (servers == null) {
+ return null;
+ }
+ // parse "host:port[INSTANCE]"
+ Set<TServerInstance> result = new HashSet<>();
+ if (!servers.isEmpty()) {
+ for (String part : servers.split(",")) {
+ String[] parts = part.split("\\[", 2);
+ String hostport = parts[0];
+ String instance = parts[1];
+ if (instance != null && instance.endsWith("]")) {
+ instance = instance.substring(0, instance.length() - 1);
+ }
+ result.add(new TServerInstance(AddressUtil.parseAddress(hostport,
false), instance));
+ }
+ }
+ return result;
+ }
+
+ private static Map<TableId,MergeInfo> parseMerges(final String merges) {
+ if (merges == null) {
+ return null;
+ }
+ try {
+ Map<TableId,MergeInfo> result = new HashMap<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(merges);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ MergeInfo mergeInfo = new MergeInfo();
+ mergeInfo.readFields(buffer);
+ result.put(mergeInfo.extent.tableId(), mergeInfo);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
+ final long splitThreshold) {
+ return tm.getFilesMap().values().stream().map(DataFileValue::getSize)
+ .collect(Collectors.summarizingLong(Long::longValue)).getSum() >
splitThreshold;
+ }
+
+ private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+ final Set<TableId> onlineTables, final Set<TServerInstance> current,
final boolean debug) {
+ // is the table supposed to be online or offline?
+ final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+
+ if (debug) {
+ LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {},
hostingRequested: {}",
+ tm.getExtent(), getLocationState(current, tm), (shouldBeOnline ?
"on" : "off"),
+ tm.getHostingGoal(), tm.getHostingRequested());
+ }
+ switch (getLocationState(current, tm)) {
+ case ASSIGNED:
+ // we always want data about assigned tablets
+ return true;
+ case HOSTED:
+ if (!shouldBeOnline || tm.getHostingGoal() == TabletHostingGoal.NEVER
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
!tm.getHostingRequested())) {
+ return true;
+ }
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ return true;
+ case SUSPENDED:
+ case UNASSIGNED:
+ if (shouldBeOnline && (tm.getHostingGoal() == TabletHostingGoal.ALWAYS
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
tm.getHostingRequested()))) {
+ return true;
+ }
+ break;
+ default:
+ throw new AssertionError(
+ "Inconceivable! The tablet is an unrecognized state: " +
getLocationState(current, tm));
+ }
+ return false;
+ }
+
+ private static TabletState getLocationState(final Set<TServerInstance>
liveServers,
+ final TabletMetadata tm) {
+ Location loc = tm.getLocation();
+
+ if (loc == null || loc.getType() == null || loc.getServerInstance() ==
null) {
+ return TabletState.UNASSIGNED;
+ }
+
+ if (loc.getType().equals(LocationType.FUTURE)) {
+ return liveServers.contains(loc.getServerInstance()) ?
TabletState.ASSIGNED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (loc.getType().equals(LocationType.CURRENT)) {
+ return liveServers.contains(loc.getServerInstance()) ? TabletState.HOSTED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (tm.getSuspend() != null) {
+ return TabletState.SUSPENDED;
+ } else {
+ return TabletState.UNASSIGNED;
+ }
+ }
+
+ public static void configureScanner(final ScannerBase scanner, final
CurrentState state) {
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LastLocationColumnFamily.NAME);
+
scanner.fetchColumnFamily(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily());
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.fetchColumnFamily(HostingColumnFamily.NAME);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows",
WholeRowIterator.class));
+ IteratorSetting tabletChange =
+ new IteratorSetting(1001, "ManagerTabletInfoIterator",
ManagerTabletInfoIterator.class);
+ if (state != null) {
+ ManagerTabletInfoIterator.setCurrentServers(tabletChange,
state.onlineTabletServers());
+ ManagerTabletInfoIterator.setOnlineTables(tabletChange,
state.onlineTables());
+ ManagerTabletInfoIterator.setMerges(tabletChange, state.merges());
+ ManagerTabletInfoIterator.setMigrations(tabletChange,
state.migrationsSnapshot());
+ ManagerTabletInfoIterator.setManagerState(tabletChange,
state.getManagerState());
+ ManagerTabletInfoIterator.setShuttingDown(tabletChange,
state.shutdownServers());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
+
+ public static ManagerTabletInfo decode(Entry<Key,Value> e) throws
IOException {
+ return new ManagerTabletInfo(e.getKey(), e.getValue());
+ }
+
+ private Set<TServerInstance> current;
+ private Set<TableId> onlineTables;
+ private Map<TableId,MergeInfo> merges;
+ private boolean debug = false;
+ private Set<KeyExtent> migrations;
+ private ManagerState managerState = ManagerState.NORMAL;
+ private IteratorEnvironment env;
+ private Key topKey = null;
+ private Value topValue = null;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.env = env;
+ current = parseServers(options.get(SERVERS_OPTION));
+ onlineTables = parseTableIDs(options.get(TABLES_OPTION));
+ merges = parseMerges(options.get(MERGES_OPTION));
+ debug = options.containsKey(DEBUG_OPTION);
+ migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+ try {
+ managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION));
+ } catch (Exception ex) {
+ if (options.get(MANAGER_STATE_OPTION) != null) {
+ LOG.error("Unable to decode managerState {}",
options.get(MANAGER_STATE_OPTION));
+ }
+ }
+ Set<TServerInstance> shuttingDown =
parseServers(options.get(SHUTTING_DOWN_OPTION));
+ if (current != null && shuttingDown != null) {
+ current.removeAll(shuttingDown);
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null && topValue != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ super.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
+ throws IOException {
+ topKey = null;
+ topValue = null;
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
+ protected void consume() throws IOException {
+
+ final Set<ManagementAction> reasonsToReturnThisTablet = new HashSet<>();
+ while (getSource().hasTop()) {
+ final Key k = getSource().getTopKey();
+ final Value v = getSource().getTopValue();
+ final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
+ final TabletMetadata tm =
TabletMetadata.convertRow(decodedRow.entrySet().iterator(),
+ ManagerTabletInfo.CONFIGURED_COLUMNS, false, true);
+
+ LOG.debug("Evaluating extent: {}", tm);
+ if (sendTabletToManager(tm, reasonsToReturnThisTablet)) {
+ // If we simply returned here, then the client would get the encoded
K,V
+ // from the WholeRowIterator. However, it would not know the reason(s)
why
+ // it was returned. Insert a K,V pair to represent the reasons. The
client
+ // can pull this K,V pair from the results by looking at the colf.
+ ManagerTabletInfo.addActions(decodedRow, reasonsToReturnThisTablet);
+ topKey = decodedRow.firstKey();
+ topValue = WholeRowIterator.encodeRow(new
ArrayList<>(decodedRow.keySet()),
+ new ArrayList<>(decodedRow.values()));
+ LOG.debug("Returning extent with reasons: {}",
reasonsToReturnThisTablet);
+ return;
+ }
+
+ LOG.debug("No reason to return this extent, continuing");
Review Comment:
```suggestion
LOG.trace("No reason to return this extent {}, continuing",
tm.getExtent());
```
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java:
##########
@@ -20,49 +20,25 @@
import java.io.IOException;
import java.lang.ref.Cleaner.Cleanable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TabletHostingGoal;
import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.metadata.SuspendingTServer;
-import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.TabletLocationState;
-import
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.cleaner.CleanerUtil;
-import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MetaDataTableScanner implements
ClosableIterator<TabletLocationState> {
Review Comment:
This name is very generic. Could be TabletManagementScanner
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagerTabletInfoIterator.class);
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
+ private static final String MANAGER_STATE_OPTION = "managerState";
+ private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
+
+ private static void setCurrentServers(final IteratorSetting cfg,
+ final Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<>();
+ for (TServerInstance server : goodServers) {
+ servers.add(server.getHostPortSession());
+ }
+ cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static void setOnlineTables(final IteratorSetting cfg, final
Set<TableId> onlineTables) {
+ if (onlineTables != null) {
+ cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
+ }
+ }
+
+ private static void setMerges(final IteratorSetting cfg, final
Collection<MergeInfo> merges) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (MergeInfo info : merges) {
+ KeyExtent extent = info.getExtent();
+ if (extent != null && !info.getState().equals(MergeState.NONE)) {
+ info.write(buffer);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MERGES_OPTION, encoded);
+ }
+
+ private static void setMigrations(final IteratorSetting cfg,
+ final Collection<KeyExtent> migrations) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (KeyExtent extent : migrations) {
+ extent.writeTo(buffer);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MIGRATIONS_OPTION, encoded);
+ }
+
+ private static void setManagerState(final IteratorSetting cfg, final
ManagerState state) {
+ cfg.addOption(MANAGER_STATE_OPTION, state.toString());
+ }
+
+ private static void setShuttingDown(final IteratorSetting cfg,
+ final Set<TServerInstance> servers) {
+ if (servers != null) {
+ cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static Set<KeyExtent> parseMigrations(final String migrations) {
+ if (migrations == null) {
+ return Collections.emptySet();
+ }
+ try {
+ Set<KeyExtent> result = new HashSet<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(migrations);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ result.add(KeyExtent.readFrom(buffer));
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static Set<TableId> parseTableIDs(final String tableIDs) {
+ if (tableIDs == null) {
+ return null;
+ }
+ Set<TableId> result = new HashSet<>();
+ for (String tableID : tableIDs.split(",")) {
+ result.add(TableId.of(tableID));
+ }
+ return result;
+ }
+
+ private static Set<TServerInstance> parseServers(final String servers) {
+ if (servers == null) {
+ return null;
+ }
+ // parse "host:port[INSTANCE]"
+ Set<TServerInstance> result = new HashSet<>();
+ if (!servers.isEmpty()) {
+ for (String part : servers.split(",")) {
+ String[] parts = part.split("\\[", 2);
+ String hostport = parts[0];
+ String instance = parts[1];
+ if (instance != null && instance.endsWith("]")) {
+ instance = instance.substring(0, instance.length() - 1);
+ }
+ result.add(new TServerInstance(AddressUtil.parseAddress(hostport,
false), instance));
+ }
+ }
+ return result;
+ }
+
+ private static Map<TableId,MergeInfo> parseMerges(final String merges) {
+ if (merges == null) {
+ return null;
+ }
+ try {
+ Map<TableId,MergeInfo> result = new HashMap<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(merges);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ MergeInfo mergeInfo = new MergeInfo();
+ mergeInfo.readFields(buffer);
+ result.put(mergeInfo.extent.tableId(), mergeInfo);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
+ final long splitThreshold) {
+ return tm.getFilesMap().values().stream().map(DataFileValue::getSize)
+ .collect(Collectors.summarizingLong(Long::longValue)).getSum() >
splitThreshold;
+ }
+
+ private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+ final Set<TableId> onlineTables, final Set<TServerInstance> current,
final boolean debug) {
+ // is the table supposed to be online or offline?
+ final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+
+ if (debug) {
+ LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {},
hostingRequested: {}",
+ tm.getExtent(), getLocationState(current, tm), (shouldBeOnline ?
"on" : "off"),
+ tm.getHostingGoal(), tm.getHostingRequested());
+ }
+ switch (getLocationState(current, tm)) {
+ case ASSIGNED:
+ // we always want data about assigned tablets
+ return true;
+ case HOSTED:
+ if (!shouldBeOnline || tm.getHostingGoal() == TabletHostingGoal.NEVER
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
!tm.getHostingRequested())) {
+ return true;
+ }
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ return true;
+ case SUSPENDED:
+ case UNASSIGNED:
+ if (shouldBeOnline && (tm.getHostingGoal() == TabletHostingGoal.ALWAYS
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
tm.getHostingRequested()))) {
+ return true;
+ }
+ break;
+ default:
+ throw new AssertionError(
+ "Inconceivable! The tablet is an unrecognized state: " +
getLocationState(current, tm));
+ }
+ return false;
+ }
+
+ private static TabletState getLocationState(final Set<TServerInstance>
liveServers,
+ final TabletMetadata tm) {
+ Location loc = tm.getLocation();
+
+ if (loc == null || loc.getType() == null || loc.getServerInstance() ==
null) {
+ return TabletState.UNASSIGNED;
+ }
+
+ if (loc.getType().equals(LocationType.FUTURE)) {
+ return liveServers.contains(loc.getServerInstance()) ?
TabletState.ASSIGNED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (loc.getType().equals(LocationType.CURRENT)) {
+ return liveServers.contains(loc.getServerInstance()) ? TabletState.HOSTED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (tm.getSuspend() != null) {
+ return TabletState.SUSPENDED;
+ } else {
+ return TabletState.UNASSIGNED;
+ }
+ }
+
+ public static void configureScanner(final ScannerBase scanner, final
CurrentState state) {
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LastLocationColumnFamily.NAME);
+
scanner.fetchColumnFamily(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily());
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.fetchColumnFamily(HostingColumnFamily.NAME);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows",
WholeRowIterator.class));
+ IteratorSetting tabletChange =
+ new IteratorSetting(1001, "ManagerTabletInfoIterator",
ManagerTabletInfoIterator.class);
+ if (state != null) {
+ ManagerTabletInfoIterator.setCurrentServers(tabletChange,
state.onlineTabletServers());
+ ManagerTabletInfoIterator.setOnlineTables(tabletChange,
state.onlineTables());
+ ManagerTabletInfoIterator.setMerges(tabletChange, state.merges());
+ ManagerTabletInfoIterator.setMigrations(tabletChange,
state.migrationsSnapshot());
+ ManagerTabletInfoIterator.setManagerState(tabletChange,
state.getManagerState());
+ ManagerTabletInfoIterator.setShuttingDown(tabletChange,
state.shutdownServers());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
+
+ public static ManagerTabletInfo decode(Entry<Key,Value> e) throws
IOException {
+ return new ManagerTabletInfo(e.getKey(), e.getValue());
+ }
+
+ private Set<TServerInstance> current;
+ private Set<TableId> onlineTables;
+ private Map<TableId,MergeInfo> merges;
+ private boolean debug = false;
+ private Set<KeyExtent> migrations;
+ private ManagerState managerState = ManagerState.NORMAL;
+ private IteratorEnvironment env;
+ private Key topKey = null;
+ private Value topValue = null;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.env = env;
+ current = parseServers(options.get(SERVERS_OPTION));
+ onlineTables = parseTableIDs(options.get(TABLES_OPTION));
+ merges = parseMerges(options.get(MERGES_OPTION));
+ debug = options.containsKey(DEBUG_OPTION);
+ migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+ try {
+ managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION));
+ } catch (Exception ex) {
+ if (options.get(MANAGER_STATE_OPTION) != null) {
+ LOG.error("Unable to decode managerState {}",
options.get(MANAGER_STATE_OPTION));
+ }
+ }
+ Set<TServerInstance> shuttingDown =
parseServers(options.get(SHUTTING_DOWN_OPTION));
+ if (current != null && shuttingDown != null) {
+ current.removeAll(shuttingDown);
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null && topValue != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ super.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
+ throws IOException {
+ topKey = null;
+ topValue = null;
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
+ protected void consume() throws IOException {
+
+ final Set<ManagementAction> reasonsToReturnThisTablet = new HashSet<>();
+ while (getSource().hasTop()) {
+ final Key k = getSource().getTopKey();
+ final Value v = getSource().getTopValue();
+ final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
+ final TabletMetadata tm =
TabletMetadata.convertRow(decodedRow.entrySet().iterator(),
+ ManagerTabletInfo.CONFIGURED_COLUMNS, false, true);
+
+ LOG.debug("Evaluating extent: {}", tm);
+ if (sendTabletToManager(tm, reasonsToReturnThisTablet)) {
+ // If we simply returned here, then the client would get the encoded
K,V
+ // from the WholeRowIterator. However, it would not know the reason(s)
why
+ // it was returned. Insert a K,V pair to represent the reasons. The
client
+ // can pull this K,V pair from the results by looking at the colf.
+ ManagerTabletInfo.addActions(decodedRow, reasonsToReturnThisTablet);
+ topKey = decodedRow.firstKey();
+ topValue = WholeRowIterator.encodeRow(new
ArrayList<>(decodedRow.keySet()),
+ new ArrayList<>(decodedRow.values()));
+ LOG.debug("Returning extent with reasons: {}",
reasonsToReturnThisTablet);
+ return;
+ }
+
+ LOG.debug("No reason to return this extent, continuing");
+ getSource().next();
+ }
+ }
+
+ /**
+ * Evaluates whether or not this Tablet should be returned so that it can be
acted upon by the
+ * Manager
+ */
+ private boolean sendTabletToManager(final TabletMetadata tm,
Review Comment:
Could name this something like `getManagmentActions()`,
`computeManagmentActions()`, `evaluteManagmentActions()`. The name of the
method is misleading in isolation, but it does actually flow with if stmt where
its used.
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ManagerTabletInfoIterator.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
+import
org.apache.accumulo.core.manager.state.ManagerTabletInfo.ManagementAction;
+import org.apache.accumulo.core.manager.thrift.ManagerState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class ManagerTabletInfoIterator extends SkippingIterator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagerTabletInfoIterator.class);
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
+ private static final String MANAGER_STATE_OPTION = "managerState";
+ private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
+
+ private static void setCurrentServers(final IteratorSetting cfg,
+ final Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<>();
+ for (TServerInstance server : goodServers) {
+ servers.add(server.getHostPortSession());
+ }
+ cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static void setOnlineTables(final IteratorSetting cfg, final
Set<TableId> onlineTables) {
+ if (onlineTables != null) {
+ cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
+ }
+ }
+
+ private static void setMerges(final IteratorSetting cfg, final
Collection<MergeInfo> merges) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (MergeInfo info : merges) {
+ KeyExtent extent = info.getExtent();
+ if (extent != null && !info.getState().equals(MergeState.NONE)) {
+ info.write(buffer);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MERGES_OPTION, encoded);
+ }
+
+ private static void setMigrations(final IteratorSetting cfg,
+ final Collection<KeyExtent> migrations) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (KeyExtent extent : migrations) {
+ extent.writeTo(buffer);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded =
+ Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(),
buffer.getLength()));
+ cfg.addOption(MIGRATIONS_OPTION, encoded);
+ }
+
+ private static void setManagerState(final IteratorSetting cfg, final
ManagerState state) {
+ cfg.addOption(MANAGER_STATE_OPTION, state.toString());
+ }
+
+ private static void setShuttingDown(final IteratorSetting cfg,
+ final Set<TServerInstance> servers) {
+ if (servers != null) {
+ cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
+ }
+ }
+
+ private static Set<KeyExtent> parseMigrations(final String migrations) {
+ if (migrations == null) {
+ return Collections.emptySet();
+ }
+ try {
+ Set<KeyExtent> result = new HashSet<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(migrations);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ result.add(KeyExtent.readFrom(buffer));
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static Set<TableId> parseTableIDs(final String tableIDs) {
+ if (tableIDs == null) {
+ return null;
+ }
+ Set<TableId> result = new HashSet<>();
+ for (String tableID : tableIDs.split(",")) {
+ result.add(TableId.of(tableID));
+ }
+ return result;
+ }
+
+ private static Set<TServerInstance> parseServers(final String servers) {
+ if (servers == null) {
+ return null;
+ }
+ // parse "host:port[INSTANCE]"
+ Set<TServerInstance> result = new HashSet<>();
+ if (!servers.isEmpty()) {
+ for (String part : servers.split(",")) {
+ String[] parts = part.split("\\[", 2);
+ String hostport = parts[0];
+ String instance = parts[1];
+ if (instance != null && instance.endsWith("]")) {
+ instance = instance.substring(0, instance.length() - 1);
+ }
+ result.add(new TServerInstance(AddressUtil.parseAddress(hostport,
false), instance));
+ }
+ }
+ return result;
+ }
+
+ private static Map<TableId,MergeInfo> parseMerges(final String merges) {
+ if (merges == null) {
+ return null;
+ }
+ try {
+ Map<TableId,MergeInfo> result = new HashMap<>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.getDecoder().decode(merges);
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ MergeInfo mergeInfo = new MergeInfo();
+ mergeInfo.readFields(buffer);
+ result.put(mergeInfo.extent.tableId(), mergeInfo);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
+ final long splitThreshold) {
+ return tm.getFilesMap().values().stream().map(DataFileValue::getSize)
+ .collect(Collectors.summarizingLong(Long::longValue)).getSum() >
splitThreshold;
+ }
+
+ private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+ final Set<TableId> onlineTables, final Set<TServerInstance> current,
final boolean debug) {
+ // is the table supposed to be online or offline?
+ final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+
+ if (debug) {
+ LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {},
hostingRequested: {}",
+ tm.getExtent(), getLocationState(current, tm), (shouldBeOnline ?
"on" : "off"),
+ tm.getHostingGoal(), tm.getHostingRequested());
+ }
+ switch (getLocationState(current, tm)) {
+ case ASSIGNED:
+ // we always want data about assigned tablets
+ return true;
+ case HOSTED:
+ if (!shouldBeOnline || tm.getHostingGoal() == TabletHostingGoal.NEVER
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
!tm.getHostingRequested())) {
+ return true;
+ }
+ break;
+ case ASSIGNED_TO_DEAD_SERVER:
+ return true;
+ case SUSPENDED:
+ case UNASSIGNED:
+ if (shouldBeOnline && (tm.getHostingGoal() == TabletHostingGoal.ALWAYS
+ || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND &&
tm.getHostingRequested()))) {
+ return true;
+ }
+ break;
+ default:
+ throw new AssertionError(
+ "Inconceivable! The tablet is an unrecognized state: " +
getLocationState(current, tm));
+ }
+ return false;
+ }
+
+ private static TabletState getLocationState(final Set<TServerInstance>
liveServers,
+ final TabletMetadata tm) {
+ Location loc = tm.getLocation();
+
+ if (loc == null || loc.getType() == null || loc.getServerInstance() ==
null) {
+ return TabletState.UNASSIGNED;
+ }
+
+ if (loc.getType().equals(LocationType.FUTURE)) {
+ return liveServers.contains(loc.getServerInstance()) ?
TabletState.ASSIGNED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (loc.getType().equals(LocationType.CURRENT)) {
+ return liveServers.contains(loc.getServerInstance()) ? TabletState.HOSTED
+ : TabletState.ASSIGNED_TO_DEAD_SERVER;
+ } else if (tm.getSuspend() != null) {
+ return TabletState.SUSPENDED;
+ } else {
+ return TabletState.UNASSIGNED;
+ }
+ }
+
+ public static void configureScanner(final ScannerBase scanner, final
CurrentState state) {
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LastLocationColumnFamily.NAME);
+
scanner.fetchColumnFamily(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily());
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.fetchColumnFamily(HostingColumnFamily.NAME);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows",
WholeRowIterator.class));
+ IteratorSetting tabletChange =
+ new IteratorSetting(1001, "ManagerTabletInfoIterator",
ManagerTabletInfoIterator.class);
+ if (state != null) {
+ ManagerTabletInfoIterator.setCurrentServers(tabletChange,
state.onlineTabletServers());
+ ManagerTabletInfoIterator.setOnlineTables(tabletChange,
state.onlineTables());
+ ManagerTabletInfoIterator.setMerges(tabletChange, state.merges());
+ ManagerTabletInfoIterator.setMigrations(tabletChange,
state.migrationsSnapshot());
+ ManagerTabletInfoIterator.setManagerState(tabletChange,
state.getManagerState());
+ ManagerTabletInfoIterator.setShuttingDown(tabletChange,
state.shutdownServers());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
+
+ public static ManagerTabletInfo decode(Entry<Key,Value> e) throws
IOException {
+ return new ManagerTabletInfo(e.getKey(), e.getValue());
+ }
+
+ private Set<TServerInstance> current;
+ private Set<TableId> onlineTables;
+ private Map<TableId,MergeInfo> merges;
+ private boolean debug = false;
+ private Set<KeyExtent> migrations;
+ private ManagerState managerState = ManagerState.NORMAL;
+ private IteratorEnvironment env;
+ private Key topKey = null;
+ private Value topValue = null;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.env = env;
+ current = parseServers(options.get(SERVERS_OPTION));
+ onlineTables = parseTableIDs(options.get(TABLES_OPTION));
+ merges = parseMerges(options.get(MERGES_OPTION));
+ debug = options.containsKey(DEBUG_OPTION);
+ migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+ try {
+ managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION));
+ } catch (Exception ex) {
+ if (options.get(MANAGER_STATE_OPTION) != null) {
+ LOG.error("Unable to decode managerState {}",
options.get(MANAGER_STATE_OPTION));
+ }
+ }
+ Set<TServerInstance> shuttingDown =
parseServers(options.get(SHUTTING_DOWN_OPTION));
+ if (current != null && shuttingDown != null) {
+ current.removeAll(shuttingDown);
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null && topValue != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ super.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
+ throws IOException {
+ topKey = null;
+ topValue = null;
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
+ protected void consume() throws IOException {
+
+ final Set<ManagementAction> reasonsToReturnThisTablet = new HashSet<>();
+ while (getSource().hasTop()) {
+ final Key k = getSource().getTopKey();
+ final Value v = getSource().getTopValue();
+ final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
+ final TabletMetadata tm =
TabletMetadata.convertRow(decodedRow.entrySet().iterator(),
+ ManagerTabletInfo.CONFIGURED_COLUMNS, false, true);
+
+ LOG.debug("Evaluating extent: {}", tm);
Review Comment:
```suggestion
LOG.trace("Evaluating extent: {}", tm);
```
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java:
##########
@@ -25,16 +25,17 @@
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.manager.state.ManagerTabletInfo;
import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
/**
* Interface for storing information about tablet assignments. There are three
implementations:
*/
-public interface TabletStateStore extends Iterable<TabletLocationState> {
+public interface TabletStateStore extends Iterable<ManagerTabletInfo> {
Review Comment:
A lot of the reasons behind the creation of this abstraction are going away.
This used to abstract writing to metadata tablet vs zookeeper, but a lot of
that has been more generally pushed into ample. The changes in this PR remove
the TabletLocationState class that used to go with this interface. This
interface used to abstract the computation of TabletLocationState. Wondering
if this interface is on its way to becoming a vestigial artifact that could be
removed and result in an overall simplification of the code. Maybe it still
has a place, not sure.
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java:
##########
@@ -42,16 +39,14 @@ class ZooTabletStateStore extends AbstractTabletStateStore
implements TabletStat
private static final Logger log =
LoggerFactory.getLogger(ZooTabletStateStore.class);
private final Ample ample;
- private final ClientContext context;
ZooTabletStateStore(ClientContext context) {
super(context);
- this.context = context;
this.ample = context.getAmple();
}
@Override
- public ClosableIterator<TabletLocationState> iterator() {
+ public ClosableIterator<ManagerTabletInfo> iterator() {
Review Comment:
In the past this method could always return a tablet location state even if
there was nothing to do. Now I think it should only return something if there
is work to do. Also thinking it should somehow call the code in
ManagerTabletInfoIterator. This does not compile, but I worked up the
following as a proof of concept of what I was thinking.
```java
@Override
public ClosableIterator<ManagerTabletInfo> iterator() {
List<ManagerTabletInfo> mtis = List.of();
TabletMetadata tabletMetadata = ample.readTablet(RootTable.EXTENT,
ReadConsistency.EVENTUAL);
Set<ManagementAction> reasons = new HashSet<>();
if(ManagerTabletInfoIterator.sendToTabletServer(tabletMetadata,
reasons)){
mtis = List.of(new ManagerTabletInfo(reasons, tabletMetadata));
}
var iterator = mtis.iterator();
return new ClosableIterator<>() {
private final TabletMetadata tabletMetadata =
ample.readTablet(RootTable.EXTENT, ReadConsistency.EVENTUAL);
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public ManagerTabletInfo next() {
return iterator.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() {}
};
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]