This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new f2b0936 Improve client table exceptions (#2195) f2b0936 is described below commit f2b0936029fcaadb925bd9c229403c82f0283211 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Fri Jul 9 12:22:20 2021 -0400 Improve client table exceptions (#2195) Improve client-side table exception handling by providing tableNames from the client to avoid looking them up later to format an exception message. * Add TableOfflineException constructor to provide a formatted message without requiring a ClientContext to look up the name or id * Reuse code to check for TableNotFoundException, TableDeletedException, and TableOfflineException using convenience methods in ClientContext * Pass along tableName alongside tableId, in some internal client-side code, for use in exception messages, if needed later * Make check for requiring that a table not be offline explicit, rather than entangled with the tableId lookup in ClientContext * Update ConnectorImpl to proxy to ClientContext in a few cases where it previously wasn't, to ensure consistent behavior and argument validations * Use TableId from Thrift exception to avoid another client-side lookup when a fate operation throws an exception because the table is offline * Simplify private addSplit method by passing SplitEnv, rather than sending env.tableId and env.tableName as separate parameters * Remove some redundant tableName validations when they are already validated by context.getTableId * Simplify and inline a private updateAuthorizationsFailures method in TabletServerBatchWriter that was used to gather a set of authorizations failures whose tables were then checked for being deleted * Fix a few TableState enum comparisons from `.equals()` to `==` --- .../core/client/TableOfflineException.java | 13 ++++ .../core/client/mapred/AbstractInputFormat.java | 10 +-- .../core/client/mapreduce/AbstractInputFormat.java | 10 +-- .../accumulo/core/clientImpl/ClientContext.java | 44 ++++++++--- .../core/clientImpl/ConditionalWriterImpl.java | 17 ++--- .../accumulo/core/clientImpl/ConnectorImpl.java | 14 ++-- .../core/clientImpl/ReplicationOperationsImpl.java | 48 ++---------- .../core/clientImpl/TableOperationsImpl.java | 85 +++++++++------------- .../apache/accumulo/core/clientImpl/Tables.java | 12 --- .../core/clientImpl/TabletServerBatchDeleter.java | 4 +- .../core/clientImpl/TabletServerBatchReader.java | 12 +-- .../TabletServerBatchReaderIterator.java | 16 ++-- .../core/clientImpl/TabletServerBatchWriter.java | 44 ++++------- .../accumulo/core/clientImpl/ThriftScanner.java | 13 +--- .../clientImpl/TabletServerBatchReaderTest.java | 5 +- .../hadoopImpl/mapred/AccumuloRecordReader.java | 9 +-- .../hadoopImpl/mapreduce/AccumuloRecordReader.java | 9 +-- .../server/master/balancer/TableLoadBalancer.java | 4 +- .../java/org/apache/accumulo/manager/Manager.java | 5 +- .../monitor/rest/tables/TablesResource.java | 2 +- 20 files changed, 150 insertions(+), 226 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/TableOfflineException.java b/core/src/main/java/org/apache/accumulo/core/client/TableOfflineException.java index 2a8d8dd..82aed83 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/TableOfflineException.java +++ b/core/src/main/java/org/apache/accumulo/core/client/TableOfflineException.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.core.client; +import org.apache.accumulo.core.data.TableId; + public class TableOfflineException extends RuntimeException { private static final long serialVersionUID = 1L; @@ -38,6 +40,17 @@ public class TableOfflineException extends RuntimeException { } /** + * @since 2.1.0 + */ + public TableOfflineException(TableId tableId, String tableName) { + // @formatter:off + super(String.format("Table %s (%s) is offline", + tableName == null ? "<unknown table>" : tableName, + tableId == null ? "<unknown id>" : tableId)); + // @formatter:on + } + + /** * @since 2.0.0 */ public TableOfflineException(Exception cause) { diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java index cb77cac..1015fa2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java @@ -41,9 +41,7 @@ import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -64,7 +62,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.Text; @@ -698,11 +695,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { tl.invalidateCache(); while (!tl.binRanges(client, ranges, binnedRanges).isEmpty()) { - String tableIdStr = tableId.canonical(); - if (!Tables.exists(client, tableId)) - throw new TableDeletedException(tableIdStr); - if (Tables.getTableState(client, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(client, tableId)); + client.requireNotDeleted(tableId); + client.requireNotOffline(tableId, tableName); binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); // sleep randomly between 100 and 200 ms diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index 5603b91..18eef7e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -41,9 +41,7 @@ import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -64,7 +62,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.Text; @@ -729,11 +726,8 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { tl.invalidateCache(); while (!tl.binRanges(client, ranges, binnedRanges).isEmpty()) { - String tableIdStr = tableId.canonical(); - if (!Tables.exists(client, tableId)) - throw new TableDeletedException(tableIdStr); - if (Tables.getTableState(client, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(client, tableId)); + client.requireNotDeleted(tableId); + client.requireNotOffline(tableId, tableName); binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); // sleep randomly between 100 and 200 ms diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 35623e7..70425f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.ConditionalWriterConfig; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.InstanceOperations; @@ -511,9 +512,27 @@ public class ClientContext implements AccumuloClient { // this validates the table name for all callers TableId getTableId(String tableName) throws TableNotFoundException { - TableId tableId = Tables.getTableId(this, EXISTING_TABLE_NAME.validate(tableName)); + return Tables.getTableId(this, EXISTING_TABLE_NAME.validate(tableName)); + } + + // use cases overlap with requireNotDeleted, but this throws a checked exception + public TableId requireTableExists(TableId tableId, String tableName) + throws TableNotFoundException { + if (!Tables.exists(this, tableId)) + throw new TableNotFoundException(tableId.canonical(), tableName, "Table no longer exists"); + return tableId; + } + + // use cases overlap with requireTableExists, but this throws a runtime exception + public TableId requireNotDeleted(TableId tableId) { + if (!Tables.exists(this, tableId)) + throw new TableDeletedException(tableId.canonical()); + return tableId; + } + + public TableId requireNotOffline(TableId tableId, String tableName) { if (Tables.getTableState(this, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(this, tableId)); + throw new TableOfflineException(tableId, tableName); return tableId; } @@ -522,8 +541,8 @@ public class ClientContext implements AccumuloClient { int numQueryThreads) throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchReader(this, getTableId(tableName), authorizations, - numQueryThreads); + return new TabletServerBatchReader(this, requireNotOffline(getTableId(tableName), tableName), + tableName, authorizations, numQueryThreads); } @Override @@ -548,8 +567,8 @@ public class ClientContext implements AccumuloClient { int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchDeleter(this, getTableId(tableName), authorizations, - numQueryThreads, config.merge(getBatchWriterConfig())); + return new TabletServerBatchDeleter(this, requireNotOffline(getTableId(tableName), tableName), + tableName, authorizations, numQueryThreads, config.merge(getBatchWriterConfig())); } @Override @@ -567,7 +586,8 @@ public class ClientContext implements AccumuloClient { if (config == null) { config = new BatchWriterConfig(); } - return new BatchWriterImpl(this, getTableId(tableName), config.merge(getBatchWriterConfig())); + return new BatchWriterImpl(this, requireNotOffline(getTableId(tableName), tableName), + config.merge(getBatchWriterConfig())); } @Override @@ -593,14 +613,15 @@ public class ClientContext implements AccumuloClient { if (config == null) { config = new ConditionalWriterConfig(); } - return new ConditionalWriterImpl(this, getTableId(tableName), - config.merge(getConditionalWriterConfig())); + return new ConditionalWriterImpl(this, requireNotOffline(getTableId(tableName), tableName), + tableName, config.merge(getConditionalWriterConfig())); } @Override public ConditionalWriter createConditionalWriter(String tableName) throws TableNotFoundException { ensureOpen(); - return new ConditionalWriterImpl(this, getTableId(tableName), new ConditionalWriterConfig()); + return new ConditionalWriterImpl(this, requireNotOffline(getTableId(tableName), tableName), + tableName, new ConditionalWriterConfig()); } @Override @@ -608,7 +629,8 @@ public class ClientContext implements AccumuloClient { throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - Scanner scanner = new ScannerImpl(this, getTableId(tableName), authorizations); + Scanner scanner = + new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations); Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties()); if (batchSize != null) { scanner.setBatchSize(batchSize); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index 15d57ed..83c6dff 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@ -46,8 +46,6 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriterConfig; import org.apache.accumulo.core.client.Durability; -import org.apache.accumulo.core.client.TableDeletedException; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.clientImpl.TabletLocator.TabletServerMutations; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; @@ -63,7 +61,6 @@ import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation; import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TMutation; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; @@ -106,6 +103,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private final ClientContext context; private TabletLocator locator; private final TableId tableId; + private final String tableName; private long timeout; private final Durability durability; private final String classLoaderContext; @@ -287,11 +285,10 @@ class ConditionalWriterImpl implements ConditionalWriter { try { locator.binMutations(context, mutations, binnedMutations, failures); - if (failures.size() == mutations.size()) - if (!Tables.exists(context, tableId)) - throw new TableDeletedException(tableId.canonical()); - else if (Tables.getTableState(context, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); + if (failures.size() == mutations.size()) { + context.requireNotDeleted(tableId); + context.requireNotOffline(tableId, tableName); + } } catch (Exception e) { mutations.forEach(qcm -> qcm.queueResult(new Result(e, qcm, null))); @@ -364,7 +361,8 @@ class ConditionalWriterImpl implements ConditionalWriter { } } - ConditionalWriterImpl(ClientContext context, TableId tableId, ConditionalWriterConfig config) { + ConditionalWriterImpl(ClientContext context, TableId tableId, String tableName, + ConditionalWriterConfig config) { this.context = context; this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); @@ -373,6 +371,7 @@ class ConditionalWriterImpl implements ConditionalWriter { this.locator = new SyncingTabletLocator(context, tableId); this.serverQueues = new HashMap<>(); this.tableId = tableId; + this.tableName = tableName; this.timeout = config.getTimeout(TimeUnit.MILLISECONDS); this.durability = config.getDurability(); this.classLoaderContext = config.getClassLoaderContext(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java index 31b5b3d..f7fad0d 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.core.clientImpl; -import static com.google.common.base.Preconditions.checkArgument; - import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; @@ -92,9 +90,8 @@ public class ConnectorImpl extends org.apache.accumulo.core.client.Connector { public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { - checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchDeleter(context, context.getTableId(tableName), authorizations, - numQueryThreads, new BatchWriterConfig().setMaxMemory(maxMemory) + return context.createBatchDeleter(tableName, authorizations, numQueryThreads, + new BatchWriterConfig().setMaxMemory(maxMemory) .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); } @@ -107,9 +104,8 @@ public class ConnectorImpl extends org.apache.accumulo.core.client.Connector { @Override public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { - return new BatchWriterImpl(context, context.getTableId(tableName), - new BatchWriterConfig().setMaxMemory(maxMemory) - .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); + return context.createBatchWriter(tableName, new BatchWriterConfig().setMaxMemory(maxMemory) + .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); } @Override @@ -121,7 +117,7 @@ public class ConnectorImpl extends org.apache.accumulo.core.client.Connector { @Override public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) { - return new MultiTableBatchWriterImpl(context, new BatchWriterConfig().setMaxMemory(maxMemory) + return context.createMultiTableBatchWriter(new BatchWriterConfig().setMaxMemory(maxMemory) .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationOperationsImpl.java index 56851af..7b143cb 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationOperationsImpl.java @@ -19,21 +19,17 @@ package org.apache.accumulo.core.clientImpl; import static java.util.Objects.requireNonNull; -import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.util.Collections; import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ReplicationOperations; -import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; @@ -58,33 +54,26 @@ public class ReplicationOperationsImpl implements ReplicationOperations { private final ClientContext context; public ReplicationOperationsImpl(ClientContext context) { - requireNonNull(context); - this.context = context; + this.context = requireNonNull(context); } @Override public void addPeer(final String name, final String replicaType) throws AccumuloException, AccumuloSecurityException { - requireNonNull(name); - requireNonNull(replicaType); - context.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + name, - replicaType); + context.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + requireNonNull(name), requireNonNull(replicaType)); } @Override public void removePeer(final String name) throws AccumuloException, AccumuloSecurityException { - requireNonNull(name); - context.instanceOperations().removeProperty(Property.REPLICATION_PEERS.getKey() + name); + context.instanceOperations() + .removeProperty(Property.REPLICATION_PEERS.getKey() + requireNonNull(name)); } @Override public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - requireNonNull(tableName); - - Set<String> wals = referencedFiles(tableName); - - drain(tableName, wals); + drain(tableName, referencedFiles(requireNonNull(tableName))); } @Override @@ -119,33 +108,10 @@ public class ReplicationOperationsImpl implements ReplicationOperations { client -> client.drainReplicationTable(tinfo, rpcCreds, tableName, wals)); } - protected TableId getTableId(AccumuloClient client, String tableName) - throws TableNotFoundException { - TableOperations tops = client.tableOperations(); - - if (!client.tableOperations().exists(tableName)) { - throw new TableNotFoundException(null, tableName, null); - } - - String tableId = null; - while (tableId == null) { - tableId = tops.tableIdMap().get(tableName); - if (tableId == null) { - sleepUninterruptibly(200, TimeUnit.MILLISECONDS); - } - } - - return TableId.of(tableId); - } - @Override public Set<String> referencedFiles(String tableName) throws TableNotFoundException { - requireNonNull(tableName); - log.debug("Collecting referenced files for replication of table {}", tableName); - - TableId tableId = getTableId(context, tableName); - + TableId tableId = context.getTableId(tableName); log.debug("Found id of {} for name {}", tableId, tableName); // Get the WALs currently referenced by the table diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index d189de8..56c56bd 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -72,7 +72,6 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.NamespaceExistsException; import org.apache.accumulo.core.client.NamespaceNotFoundException; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; @@ -395,7 +394,7 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new NamespaceNotFoundException(e); case OFFLINE: throw new TableOfflineException( - Tables.getTableOfflineMsg(context, Tables.getTableId(context, tableOrNamespaceName))); + e.getTableId() == null ? null : TableId.of(e.getTableId()), tableOrNamespaceName); case BULK_CONCURRENT_MERGE: throw new AccumuloBulkMergeException(e); default: @@ -416,11 +415,11 @@ public class TableOperationsImpl extends TableOperationsHelper { } private static class SplitEnv { - private String tableName; - private TableId tableId; - private ExecutorService executor; - private CountDownLatch latch; - private AtomicReference<Exception> exception; + private final String tableName; + private final TableId tableId; + private final ExecutorService executor; + private final CountDownLatch latch; + private final AtomicReference<Exception> exception; SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch, AtomicReference<Exception> exception) { @@ -449,7 +448,7 @@ public class TableOperationsImpl extends TableOperationsHelper { return; if (splits.size() <= 2) { - addSplits(env.tableName, new TreeSet<>(splits), env.tableId); + addSplits(env, new TreeSet<>(splits)); splits.forEach(s -> env.latch.countDown()); return; } @@ -458,7 +457,7 @@ public class TableOperationsImpl extends TableOperationsHelper { // split the middle split point to ensure that child task split // different tablets and can therefore run in parallel - addSplits(env.tableName, new TreeSet<>(splits.subList(mid, mid + 1)), env.tableId); + addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1))); env.latch.countDown(); env.executor.execute(new SplitTask(env, splits.subList(0, mid))); @@ -504,7 +503,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } else if (excep instanceof TableOfflineException) { log.debug("TableOfflineException occurred in background thread. Throwing new exception", excep); - throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); + throw new TableOfflineException(tableId, tableName); } else if (excep instanceof AccumuloSecurityException) { // base == background accumulo security exception AccumuloSecurityException base = (AccumuloSecurityException) excep; @@ -526,12 +525,10 @@ public class TableOperationsImpl extends TableOperationsHelper { } } - private void addSplits(String tableName, SortedSet<Text> partitionKeys, TableId tableId) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - AccumuloServerException { - EXISTING_TABLE_NAME.validate(tableName); + private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException, AccumuloServerException { - TabletLocator tabLocator = TabletLocator.getLocator(context, tableId); + TabletLocator tabLocator = TabletLocator.getLocator(context, env.tableId); for (Text split : partitionKeys) { boolean successful = false; int attempt = 0; @@ -547,10 +544,8 @@ public class TableOperationsImpl extends TableOperationsHelper { TabletLocation tl = tabLocator.locateTablet(context, split, false, false); if (tl == null) { - if (!Tables.exists(context, tableId)) - throw new TableNotFoundException(tableId.canonical(), tableName, null); - else if (Tables.getTableState(context, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); + context.requireTableExists(env.tableId, env.tableName); + context.requireNotOffline(env.tableId, env.tableName); continue; } @@ -591,15 +586,14 @@ public class TableOperationsImpl extends TableOperationsHelper { continue; } catch (ThriftSecurityException e) { Tables.clearCache(context); - if (!Tables.exists(context, tableId)) - throw new TableNotFoundException(tableId.canonical(), tableName, null); + context.requireTableExists(env.tableId, env.tableName); throw new AccumuloSecurityException(e.user, e.code, e); } catch (NotServingTabletException e) { // Do not silently spin when we repeatedly fail to get the location for a tablet locationFailures++; if (locationFailures == 5 || locationFailures % 50 == 0) { log.warn("Having difficulty locating hosting tabletserver for split {} on table {}." - + " Seen {} failures.", split, tableName, locationFailures); + + " Seen {} failures.", split, env.tableName, locationFailures); } tabLocator.invalidateCache(tl.tablet_extent); @@ -661,9 +655,7 @@ public class TableOperationsImpl extends TableOperationsHelper { private List<Text> _listSplits(String tableName) throws TableNotFoundException, AccumuloSecurityException { - EXISTING_TABLE_NAME.validate(tableName); - - TableId tableId = Tables.getTableId(context, tableName); + TableId tableId = context.getTableId(tableName); TreeMap<KeyExtent,String> tabletLocations = new TreeMap<>(); while (true) { try { @@ -674,9 +666,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } catch (AccumuloSecurityException ase) { throw ase; } catch (Exception e) { - if (!Tables.exists(context, tableId)) { - throw new TableNotFoundException(tableId.canonical(), tableName, null); - } + context.requireTableExists(tableId, tableName); if (e instanceof RuntimeException && e.getCause() instanceof AccumuloSecurityException) { throw (AccumuloSecurityException) e.getCause(); @@ -751,10 +741,9 @@ public class TableOperationsImpl extends TableOperationsHelper { public void clone(String srcTableName, String newTableName, CloneConfiguration config) throws AccumuloSecurityException, TableNotFoundException, AccumuloException, TableExistsException { - EXISTING_TABLE_NAME.validate(srcTableName); NEW_TABLE_NAME.validate(newTableName); - TableId srcTableId = Tables.getTableId(context, srcTableName); + TableId srcTableId = context.getTableId(srcTableName); if (config.isFlush()) _flush(srcTableId, null, null, true); @@ -811,9 +800,7 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - EXISTING_TABLE_NAME.validate(tableName); - TableId tableId = Tables.getTableId(context, tableName); - _flush(tableId, start, end, wait); + _flush(context.getTableId(tableName), start, end, wait); } @Override @@ -1148,10 +1135,8 @@ public class TableOperationsImpl extends TableOperationsHelper { // tablets... so clear it tl.invalidateCache(); while (!tl.binRanges(context, Collections.singletonList(range), binnedRanges).isEmpty()) { - if (!Tables.exists(context, tableId)) - throw new TableDeletedException(tableId.canonical()); - if (Tables.getTableState(context, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); + context.requireNotDeleted(tableId); + context.requireNotOffline(tableId, tableName); log.warn("Unable to locate bins for specified range. Retrying."); // sleep randomly between 100 and 200ms @@ -1257,8 +1242,7 @@ public class TableOperationsImpl extends TableOperationsHelper { Tables.clearCache(context); TableState currentState = Tables.getTableState(context, tableId); if (currentState != expectedState) { - if (!Tables.exists(context, tableId)) - throw new TableDeletedException(tableId.canonical()); + context.requireNotDeleted(tableId); if (currentState == TableState.DELETING) throw new TableNotFoundException(tableId.canonical(), "", "Table is being deleted."); throw new AccumuloException("Unexpected table state " + tableId + " " @@ -1832,20 +1816,14 @@ public class TableOperationsImpl extends TableOperationsHelper { .logInterval(3, TimeUnit.MINUTES).createRetry(); while (!locator.binRanges(context, rangeList, binnedRanges).isEmpty()) { - - if (!Tables.exists(context, tableId)) - throw new TableNotFoundException(tableId.canonical(), tableName, null); - if (Tables.getTableState(context, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); - + context.requireTableExists(tableId, tableName); + context.requireNotOffline(tableId, tableName); binnedRanges.clear(); - try { retry.waitForNextAttempt(); } catch (InterruptedException e) { throw new RuntimeException(e); } - locator.invalidateCache(); } @@ -1854,7 +1832,14 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public SummaryRetriever summaries(String tableName) { - EXISTING_TABLE_NAME.validate(tableName); + TableId tableId; + try { + tableId = context.getTableId(tableName); + } catch (TableNotFoundException e) { + // this has to be a runtime exception, because TableNotFoundException wasn't put on the + // interface in 2.0 and adding it now would break the API contract + throw new IllegalArgumentException(e); + } return new SummaryRetriever() { private Text startRow = null; @@ -1882,9 +1867,7 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public List<Summary> retrieve() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - TableId tableId = Tables.getTableId(context, tableName); - if (Tables.getTableState(context, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); + context.requireNotOffline(tableId, tableName); TRowRange range = new TRowRange(TextUtil.getByteBuffer(startRow), TextUtil.getByteBuffer(endRow)); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/Tables.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/Tables.java index 2c3a64b..ae35b69 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/Tables.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/Tables.java @@ -139,18 +139,6 @@ public class Tables { return tableName; } - public static String getTableOfflineMsg(ClientContext context, TableId tableId) { - if (tableId == null) - return "Table <unknown table> is offline"; - - try { - String tableName = Tables.getTableName(context, tableId); - return "Table " + tableName + " (" + tableId.canonical() + ") is offline"; - } catch (TableNotFoundException e) { - return "Table <unknown table> (" + tableId.canonical() + ") is offline"; - } - } - public static Map<String,TableId> getNameToIdMap(ClientContext context) { return getTableMap(context).getNameToIdMap(); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchDeleter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchDeleter.java index 37606e8..4eb4557 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchDeleter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchDeleter.java @@ -40,9 +40,9 @@ public class TabletServerBatchDeleter extends TabletServerBatchReader implements private TableId tableId; private BatchWriterConfig bwConfig; - public TabletServerBatchDeleter(ClientContext context, TableId tableId, + public TabletServerBatchDeleter(ClientContext context, TableId tableId, String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig bwConfig) { - super(context, BatchDeleter.class, tableId, authorizations, numQueryThreads); + super(context, BatchDeleter.class, tableId, tableName, authorizations, numQueryThreads); this.context = context; this.tableId = tableId; this.bwConfig = bwConfig; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java index a3b7816..55e70e3 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java @@ -46,6 +46,7 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan private final int batchReaderInstance = nextBatchReaderInstance.getAndIncrement(); private final TableId tableId; + private final String tableName; private final int numThreads; private final ThreadPoolExecutor queryThreadPool; private final ClientContext context; @@ -55,19 +56,20 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan private ArrayList<Range> ranges = null; - public TabletServerBatchReader(ClientContext context, TableId tableId, + public TabletServerBatchReader(ClientContext context, TableId tableId, String tableName, Authorizations authorizations, int numQueryThreads) { - this(context, BatchScanner.class, tableId, authorizations, numQueryThreads); + this(context, BatchScanner.class, tableId, tableName, authorizations, numQueryThreads); } protected TabletServerBatchReader(ClientContext context, Class<?> scopeClass, TableId tableId, - Authorizations authorizations, int numQueryThreads) { + String tableName, Authorizations authorizations, int numQueryThreads) { checkArgument(context != null, "context is null"); checkArgument(tableId != null, "tableId is null"); checkArgument(authorizations != null, "authorizations is null"); this.context = context; this.authorizations = authorizations; this.tableId = tableId; + this.tableName = tableName; this.numThreads = numQueryThreads; queryThreadPool = ThreadPools.createFixedThreadPool(numQueryThreads, @@ -114,7 +116,7 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan throw new IllegalStateException("batch reader closed"); } - return new TabletServerBatchReaderIterator(context, tableId, authorizations, ranges, numThreads, - queryThreadPool, this, timeOut); + return new TabletServerBatchReaderIterator(context, tableId, tableName, authorizations, ranges, + numThreads, queryThreadPool, this, timeOut); } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index d364f39..84629f6 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -44,7 +44,6 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.Column; @@ -58,7 +57,6 @@ import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyValue; import org.apache.accumulo.core.dataImpl.thrift.TRange; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; @@ -83,6 +81,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private final ClientContext context; private final TableId tableId; + private final String tableName; private Authorizations authorizations = Authorizations.EMPTY; private final int numThreads; private final ExecutorService queryThreadPool; @@ -108,12 +107,13 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value void receive(List<Entry<Key,Value>> entries); } - public TabletServerBatchReaderIterator(ClientContext context, TableId tableId, + public TabletServerBatchReaderIterator(ClientContext context, TableId tableId, String tableName, Authorizations authorizations, ArrayList<Range> ranges, int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) { this.context = context; this.tableId = tableId; + this.tableName = tableName; this.authorizations = authorizations; this.numThreads = numThreads; this.queryThreadPool = queryThreadPool; @@ -246,12 +246,10 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value // the table was deleted the tablet locator entries for the deleted table were not // cleared... so // need to always do the check when failures occur - if (failures.size() >= lastFailureSize) - if (!Tables.exists(context, tableId)) - throw new TableDeletedException(tableId.canonical()); - else if (Tables.getTableState(context, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); - + if (failures.size() >= lastFailureSize) { + context.requireNotDeleted(tableId); + context.requireNotOffline(tableId, tableName); + } lastFailureSize = failures.size(); if (log.isTraceEnabled()) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index 8c171ff..30369fe 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -18,6 +18,10 @@ */ package org.apache.accumulo.core.clientImpl; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + import java.io.IOException; import java.lang.management.CompilationMXBean; import java.lang.management.GarbageCollectorMXBean; @@ -38,7 +42,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -61,7 +64,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.dataImpl.thrift.TMutation; import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; @@ -486,26 +488,13 @@ public class TabletServerBatchWriter implements AutoCloseable { } } - private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) { - HashMap<KeyExtent,SecurityErrorCode> map = new HashMap<>(); - for (KeyExtent ke : keySet) - map.put(ke, code); - - updateAuthorizationFailures(map); - } - private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) { if (!authorizationFailures.isEmpty()) { // was a table deleted? - HashSet<TableId> tableIds = new HashSet<>(); - for (KeyExtent ke : authorizationFailures.keySet()) - tableIds.add(ke.tableId()); - Tables.clearCache(context); - for (TableId tableId : tableIds) - if (!Tables.exists(context, tableId)) - throw new TableDeletedException(tableId.canonical()); + authorizationFailures.keySet().stream().map(KeyExtent::tableId) + .forEach(context::requireNotDeleted); synchronized (this) { somethingFailed = true; @@ -679,12 +668,10 @@ public class TabletServerBatchWriter implements AutoCloseable { if (!tableFailures.isEmpty()) { failedMutations.add(tableId, tableFailures); - if (tableFailures.size() == tableMutations.size()) - if (!Tables.exists(context, entry.getKey())) - throw new TableDeletedException(entry.getKey().canonical()); - else if (Tables.getTableState(context, tableId) == TableState.OFFLINE) - throw new TableOfflineException( - Tables.getTableOfflineMsg(context, entry.getKey())); + if (tableFailures.size() == tableMutations.size()) { + context.requireNotDeleted(tableId); + context.requireNotOffline(tableId, null); + } } } @@ -916,7 +903,7 @@ public class TabletServerBatchWriter implements AutoCloseable { getLocator(entry.getKey().tableId()).invalidateCache(entry.getKey()); } catch (ConstraintViolationException e) { updatedConstraintViolations(e.violationSummaries.stream() - .map(ConstraintViolationSummary::new).collect(Collectors.toList())); + .map(ConstraintViolationSummary::new).collect(toList())); } timeoutTracker.madeProgress(); } else { @@ -944,15 +931,15 @@ public class TabletServerBatchWriter implements AutoCloseable { UpdateErrors updateErrors = client.closeUpdate(tinfo, usid); // @formatter:off - Map<KeyExtent,Long> failures = updateErrors.failedExtents.entrySet().stream().collect(Collectors.toMap( + Map<KeyExtent,Long> failures = updateErrors.failedExtents.entrySet().stream().collect(toMap( entry -> KeyExtent.fromThrift(entry.getKey()), Entry::getValue )); // @formatter:on updatedConstraintViolations(updateErrors.violationSummaries.stream() - .map(ConstraintViolationSummary::new).collect(Collectors.toList())); + .map(ConstraintViolationSummary::new).collect(toList())); // @formatter:off - updateAuthorizationFailures(updateErrors.authorizationFailures.entrySet().stream().collect(Collectors.toMap( + updateAuthorizationFailures(updateErrors.authorizationFailures.entrySet().stream().collect(toMap( entry -> KeyExtent.fromThrift(entry.getKey()), Entry::getValue ))); @@ -991,7 +978,8 @@ public class TabletServerBatchWriter implements AutoCloseable { updateServerErrors(location, tae); throw new AccumuloServerException(location, tae); } catch (ThriftSecurityException e) { - updateAuthorizationFailures(tabMuts.keySet(), e.code); + updateAuthorizationFailures( + tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code))); throw new AccumuloSecurityException(e.user, e.code, e); } catch (TException e) { throw new IOException(e); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index 8b44b81..382ffe5 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@ -37,9 +37,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.SampleNotPresentException; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; @@ -56,7 +54,6 @@ import org.apache.accumulo.core.dataImpl.thrift.InitialScan; import org.apache.accumulo.core.dataImpl.thrift.IterInfo; import org.apache.accumulo.core.dataImpl.thrift.ScanResult; import org.apache.accumulo.core.dataImpl.thrift.TKeyValue; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; @@ -269,11 +266,8 @@ public class ThriftScanner { scanState.startRow, scanState.skipStartRow, false); if (loc == null) { - if (!Tables.exists(context, scanState.tableId)) - throw new TableDeletedException(scanState.tableId.canonical()); - else if (Tables.getTableState(context, scanState.tableId) == TableState.OFFLINE) - throw new TableOfflineException( - Tables.getTableOfflineMsg(context, scanState.tableId)); + context.requireNotDeleted(scanState.tableId); + context.requireNotOffline(scanState.tableId, null); error = "Failed to locate tablet for table : " + scanState.tableId + " row : " + scanState.startRow; @@ -323,8 +317,7 @@ public class ThriftScanner { results = scan(loc, scanState, context); } catch (AccumuloSecurityException e) { Tables.clearCache(context); - if (!Tables.exists(context, scanState.tableId)) - throw new TableDeletedException(scanState.tableId.canonical()); + context.requireNotDeleted(scanState.tableId); e.setTableInfo(Tables.getPrintableTableInfoFromId(context, scanState.tableId)); throw e; } catch (TApplicationException tae) { diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderTest.java index 33e3405..7654df8 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderTest.java @@ -40,7 +40,8 @@ public class TabletServerBatchReaderTest { @Test public void testGetAuthorizations() { Authorizations expected = new Authorizations("a,b"); - try (BatchScanner s = new TabletServerBatchReader(context, TableId.of("foo"), expected, 1)) { + try (BatchScanner s = + new TabletServerBatchReader(context, TableId.of("foo"), "fooName", expected, 1)) { assertEquals(expected, s.getAuthorizations()); } } @@ -48,6 +49,6 @@ public class TabletServerBatchReaderTest { @Test public void testNullAuthorizationsFails() { assertThrows(IllegalArgumentException.class, - () -> new TabletServerBatchReader(context, TableId.of("foo"), null, 1)); + () -> new TabletServerBatchReader(context, TableId.of("foo"), "fooName", null, 1)); } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java index 0a77294..3173d0c 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java @@ -42,7 +42,6 @@ import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -56,7 +55,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig; @@ -345,11 +343,8 @@ public abstract class AccumuloRecordReader<K,V> implements RecordReader<K,V> { tl.invalidateCache(); while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { - String tableIdStr = tableId.canonical(); - if (!Tables.exists(context, tableId)) - throw new TableDeletedException(tableIdStr); - if (Tables.getTableState(context, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); + context.requireNotDeleted(tableId); + context.requireNotOffline(tableId, tableName); binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); // sleep randomly between 100 and 200 ms diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java index 5cbe561..fc83e6f 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java @@ -42,7 +42,6 @@ import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -56,7 +55,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; @@ -379,11 +377,8 @@ public abstract class AccumuloRecordReader<K,V> extends RecordReader<K,V> { tl.invalidateCache(); while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) { - String tableIdStr = tableId.canonical(); - if (!Tables.exists(clientContext, tableId)) - throw new TableDeletedException(tableIdStr); - if (Tables.getTableState(clientContext, tableId) == TableState.OFFLINE) - throw new TableOfflineException(Tables.getTableOfflineMsg(clientContext, tableId)); + clientContext.requireNotDeleted(tableId); + clientContext.requireNotOffline(tableId, tableName); binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); // sleep randomly between 100 and 200 ms diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java index 1c786f3..ca7bfd9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java @@ -70,9 +70,7 @@ public class TableLoadBalancer extends TabletBalancer { protected String getLoadBalancerClassNameForTable(TableId table) { TableState tableState = context.getTableManager().getTableState(table); - if (tableState == null) - return null; - if (tableState.equals(TableState.ONLINE)) + if (tableState == TableState.ONLINE) return this.context.getTableConfiguration(table).get(Property.TABLE_LOAD_BALANCER); return null; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 204f7d7..0f28945 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -326,8 +326,7 @@ public class Manager extends AbstractServer for (Entry<TableId,TableCounts> entry : watcher.getStats().entrySet()) { TableId tableId = entry.getKey(); TableCounts counts = entry.getValue(); - TableState tableState = manager.getTableState(tableId); - if (tableState != null && tableState.equals(TableState.ONLINE)) { + if (manager.getTableState(tableId) == TableState.ONLINE) { result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned() + counts.suspended(); } @@ -357,7 +356,7 @@ public class Manager extends AbstractServer public void mustBeOnline(final TableId tableId) throws ThriftTableOperationException { ServerContext context = getContext(); Tables.clearCache(context); - if (!Tables.getTableState(context, tableId).equals(TableState.ONLINE)) { + if (Tables.getTableState(context, tableId) != TableState.ONLINE) { throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online"); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java index a3025cf..ede0832 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java @@ -103,7 +103,7 @@ public class TablesResource { TableInfo tableInfo = tableStats.get(tableId); TableState tableState = tableManager.getTableState(tableId); - if (tableInfo != null && !tableState.equals(TableState.OFFLINE)) { + if (tableInfo != null && tableState != TableState.OFFLINE) { Double holdTime = compactingByTable.get(tableId.canonical()); if (holdTime == null) { holdTime = 0.;