ACCUMULO-802 updated more shell commands to include the tableNamespaces option
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5acd6a48 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5acd6a48 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5acd6a48 Branch: refs/heads/master Commit: 5acd6a480a93b1c13639db8e8ba64a81c32651bb Parents: 431f4e2 Author: Sean Hickey <tallirishll...@gmail.com> Authored: Tue Aug 6 08:54:05 2013 -0400 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Wed Dec 4 18:46:09 2013 -0500 ---------------------------------------------------------------------- .../client/admin/TableNamespaceOperations.java | 9 + .../admin/TableNamespaceOperationsImpl.java | 46 + .../core/client/admin/TableOperationsImpl.java | 411 ++--- .../core/client/impl/thrift/ClientService.java | 1431 ++++++++++++++++++ .../mock/MockTableNamespaceOperations.java | 17 +- .../util/shell/commands/ConstraintCommand.java | 18 +- .../util/shell/commands/DeleteIterCommand.java | 37 +- .../util/shell/commands/ListIterCommand.java | 32 +- .../core/util/shell/commands/OptUtil.java | 25 + .../util/shell/commands/SetIterCommand.java | 98 +- .../core/util/shell/commands/TablesCommand.java | 8 +- core/src/main/thrift/client.thrift | 1 + .../server/client/ClientServiceHandler.java | 46 + .../server/conf/TableNamespaceConfWatcher.java | 107 ++ .../server/conf/TableNamespaceConfWatcher.java | 107 -- .../org/apache/accumulo/test/ShellServerIT.java | 4 + 16 files changed, 2048 insertions(+), 349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5acd6a48/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java index f572104..314d007 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java @@ -414,4 +414,13 @@ public interface TableNamespaceOperations { * thrown if the table namespace no longer exists */ public Map<String,Integer> listConstraints(String tableNamespace) throws AccumuloException, TableNamespaceNotFoundException; + + + /** + * Test to see if the instance can load the given class as the given type. This check uses the table classpath property if it is set. + * + * @return true if the instance can load the given class as the given type, false otherwise + */ + boolean testClassLoad(String namespace, String className, String asTypeName) throws TableNamespaceNotFoundException, AccumuloException, + AccumuloSecurityException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5acd6a48/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java index 0d54b51..90d59af 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.admin; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -34,6 +35,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNamespaceExistsException; import org.apache.accumulo.core.client.TableNamespaceNotEmptyException; @@ -49,7 +51,10 @@ import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.constraints.Constraint; import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.master.thrift.TableOperation; import org.apache.accumulo.core.security.Credentials; @@ -599,4 +604,45 @@ public class TableNamespaceOperationsImpl extends TableNamespaceOperationsHelper return new TableOperationsImpl(instance, credentials); } + @Override + public void attachIterator(String namespace, IteratorSetting setting, EnumSet<IteratorScope> scopes) throws AccumuloSecurityException, AccumuloException, + TableNamespaceNotFoundException { + testClassLoad(namespace, setting.getIteratorClass(), SortedKeyValueIterator.class.getName()); + super.attachIterator(namespace, setting, scopes); + } + + @Override + public int addConstraint(String namespace, String constraintClassName) throws AccumuloException, AccumuloSecurityException, TableNamespaceNotFoundException { + testClassLoad(namespace, constraintClassName, Constraint.class.getName()); + return super.addConstraint(namespace, constraintClassName); + } + + @Override + public boolean testClassLoad(final String namespace, final String className, final String asTypeName) throws TableNamespaceNotFoundException, + AccumuloException, AccumuloSecurityException { + ArgumentChecker.notNull(namespace, className, asTypeName); + + try { + return ServerClient.executeRaw(instance, new ClientExecReturn<Boolean,ClientService.Client>() { + @Override + public Boolean execute(ClientService.Client client) throws Exception { + return client.checkTableNamespaceClass(Tracer.traceInfo(), credentials.toThrift(instance), namespace, className, asTypeName); + } + }); + } catch (ThriftTableOperationException e) { + switch (e.getType()) { + case NOTFOUND: + throw new TableNamespaceNotFoundException(e); + case OTHER: + default: + throw new AccumuloException(e.description, e); + } + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (AccumuloException e) { + throw e; + } catch (Exception e) { + throw new AccumuloException(e); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5acd6a48/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java index 18d2e40..be1a783 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java @@ -127,11 +127,11 @@ import org.apache.thrift.transport.TTransportException; public class TableOperationsImpl extends TableOperationsHelper { private Instance instance; private Credentials credentials; - + public static final String CLONE_EXCLUDE_PREFIX = "!"; private static final Logger log = Logger.getLogger(TableOperations.class); - + /** * @param instance * the connection information for this instance @@ -143,7 +143,7 @@ public class TableOperationsImpl extends TableOperationsHelper { this.instance = instance; this.credentials = credentials; } - + /** * Retrieve a list of tables in Accumulo. * @@ -156,7 +156,7 @@ public class TableOperationsImpl extends TableOperationsHelper { opTimer.stop("Fetched " + tableNames.size() + " table names in %DURATION%"); return tableNames; } - + /** * A method to check if a table exists in Accumulo. * @@ -169,13 +169,13 @@ public class TableOperationsImpl extends TableOperationsHelper { ArgumentChecker.notNull(tableName); if (tableName.equals(MetadataTable.NAME) || tableName.equals(RootTable.NAME)) return true; - + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Checking if table " + tableName + " exists..."); boolean exists = Tables.getNameToIdMap(instance).containsKey(tableName); opTimer.stop("Checked existance of " + exists + " in %DURATION%"); return exists; } - + /** * Create a table with no special configuration * @@ -192,7 +192,7 @@ public class TableOperationsImpl extends TableOperationsHelper { public void create(String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException { create(tableName, true, TimeType.MILLIS); } - + /** * @param tableName * the name of the table @@ -203,7 +203,7 @@ public class TableOperationsImpl extends TableOperationsHelper { public void create(String tableName, boolean limitVersion) throws AccumuloException, AccumuloSecurityException, TableExistsException { create(tableName, limitVersion, TimeType.MILLIS); } - + /** * @param tableName * the name of the table @@ -215,12 +215,12 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public void create(String tableName, boolean limitVersion, TimeType timeType) throws AccumuloException, AccumuloSecurityException, TableExistsException { ArgumentChecker.notNull(tableName, timeType); - + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(timeType.name().getBytes())); - + // Map<String,String> opts = IteratorUtil.generateInitialTableProperties(limitVersion); Map<String,String> opts = new HashMap<String,String>(); - + String namespace = Tables.extractNamespace(tableName); if (!namespaceExists(namespace)) { String info = "Table namespace not found while trying to create table"; @@ -229,7 +229,7 @@ public class TableOperationsImpl extends TableOperationsHelper { String info = "Can't create tables in the system namespace"; throw new IllegalArgumentException(info); } - + try { doTableOperation(TableOperation.CREATE, args, opts); } catch (TableNotFoundException e1) { @@ -237,7 +237,7 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new RuntimeException(e1); } } - + private long beginTableOperation() throws ThriftSecurityException, TException { while (true) { MasterClientService.Iface client = null; @@ -252,7 +252,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } } } - + private void executeTableOperation(long opid, TableOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp) throws ThriftSecurityException, TException, ThriftTableOperationException { while (true) { @@ -269,7 +269,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } } } - + private String waitForTableOperation(long opid) throws ThriftSecurityException, TException, ThriftTableOperationException { while (true) { MasterClientService.Iface client = null; @@ -284,7 +284,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } } } - + private void finishTableOperation(long opid) throws ThriftSecurityException, TException { while (true) { MasterClientService.Iface client = null; @@ -300,16 +300,16 @@ public class TableOperationsImpl extends TableOperationsHelper { } } } - + private String doTableOperation(TableOperation op, List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, TableExistsException, TableNotFoundException, AccumuloException { return doTableOperation(op, args, opts, true); } - + private String doTableOperation(TableOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean wait) throws AccumuloSecurityException, TableExistsException, TableNotFoundException, AccumuloException { Long opid = null; - + try { opid = beginTableOperation(); executeTableOperation(opid, op, args, opts, !wait); @@ -348,14 +348,14 @@ public class TableOperationsImpl extends TableOperationsHelper { } } } - + private static class SplitEnv { private String tableName; private String tableId; private ExecutorService executor; private CountDownLatch latch; private AtomicReference<Exception> exception; - + SplitEnv(String tableName, String tableId, ExecutorService executor, CountDownLatch latch, AtomicReference<Exception> exception) { this.tableName = tableName; this.tableId = tableId; @@ -364,47 +364,47 @@ public class TableOperationsImpl extends TableOperationsHelper { this.exception = exception; } } - + private class SplitTask implements Runnable { - + private List<Text> splits; private SplitEnv env; - + SplitTask(SplitEnv env, List<Text> splits) { this.env = env; this.splits = splits; } - + @Override public void run() { try { if (env.exception.get() != null) return; - + if (splits.size() <= 2) { addSplits(env.tableName, new TreeSet<Text>(splits), env.tableId); for (int i = 0; i < splits.size(); i++) env.latch.countDown(); return; } - + int mid = splits.size() / 2; - + // split the middle split point to ensure that child task split different tablets and can therefore // run in parallel addSplits(env.tableName, new TreeSet<Text>(splits.subList(mid, mid + 1)), env.tableId); env.latch.countDown(); - + env.executor.submit(new SplitTask(env, splits.subList(0, mid))); env.executor.submit(new SplitTask(env, splits.subList(mid + 1, splits.size()))); - + } catch (Exception e) { env.exception.compareAndSet(null, e); } } - + } - + /** * @param tableName * the name of the table @@ -420,19 +420,19 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public void addSplits(String tableName, SortedSet<Text> partitionKeys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { String tableId = Tables.getTableId(instance, tableName); - + List<Text> splits = new ArrayList<Text>(partitionKeys); // should be sorted because we copied from a sorted set, but that makes assumptions about // how the copy was done so resort to be sure. Collections.sort(splits); - + CountDownLatch latch = new CountDownLatch(splits.size()); AtomicReference<Exception> exception = new AtomicReference<Exception>(null); - + ExecutorService executor = Executors.newFixedThreadPool(16, new NamingThreadFactory("addSplits")); try { executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); - + while (!latch.await(100, TimeUnit.MILLISECONDS)) { if (exception.get() != null) { executor.shutdownNow(); @@ -455,24 +455,24 @@ public class TableOperationsImpl extends TableOperationsHelper { executor.shutdown(); } } - + private void addSplits(String tableName, SortedSet<Text> partitionKeys, String tableId) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, AccumuloServerException { TabletLocator tabLocator = TabletLocator.getLocator(instance, new Text(tableId)); - + for (Text split : partitionKeys) { boolean successful = false; int attempt = 0; - + while (!successful) { - + if (attempt > 0) UtilWaitThread.sleep(100); - + attempt++; - + TabletLocation tl = tabLocator.locateTablet(credentials, split, false, false); - + if (tl == null) { if (!Tables.exists(instance, tableId)) throw new TableNotFoundException(tableId, tableName, null); @@ -480,25 +480,25 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new TableOfflineException(instance, tableId); continue; } - + try { TabletClientService.Client client = ThriftUtil.getTServerClient(tl.tablet_location, ServerConfigurationUtil.getConfiguration(instance)); try { OpTimer opTimer = null; if (log.isTraceEnabled()) opTimer = new OpTimer(log, Level.TRACE).start("Splitting tablet " + tl.tablet_extent + " on " + tl.tablet_location + " at " + split); - + client.splitTablet(Tracer.traceInfo(), credentials.toThrift(instance), tl.tablet_extent.toThrift(), TextUtil.getByteBuffer(split)); - + // just split it, might as well invalidate it in the cache tabLocator.invalidateCache(tl.tablet_extent); - + if (opTimer != null) opTimer.stop("Split tablet in %DURATION%"); } finally { ThriftUtil.returnClient(client); } - + } catch (TApplicationException tae) { throw new AccumuloServerException(tl.tablet_location, tae); } catch (TTransportException e) { @@ -516,15 +516,15 @@ public class TableOperationsImpl extends TableOperationsHelper { tabLocator.invalidateCache(tl.tablet_location); continue; } - + successful = true; } } } - + @Override public void merge(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + ArgumentChecker.notNull(tableName); ByteBuffer EMPTY = ByteBuffer.allocate(0); List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY @@ -537,10 +537,10 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new RuntimeException(e); } } - + @Override public void deleteRows(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + ArgumentChecker.notNull(tableName); ByteBuffer EMPTY = ByteBuffer.allocate(0); List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY @@ -553,7 +553,7 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new RuntimeException(e); } } - + /** * @param tableName * the name of the table @@ -561,13 +561,13 @@ public class TableOperationsImpl extends TableOperationsHelper { */ @Override public Collection<Text> listSplits(String tableName) throws TableNotFoundException, AccumuloSecurityException { - + ArgumentChecker.notNull(tableName); - + String tableId = Tables.getTableId(instance, tableName); - + TreeMap<KeyExtent,String> tabletLocations = new TreeMap<KeyExtent,String>(); - + while (true) { try { tabletLocations.clear(); @@ -580,25 +580,25 @@ public class TableOperationsImpl extends TableOperationsHelper { if (!Tables.exists(instance, tableId)) { throw new TableNotFoundException(tableId, tableName, null); } - + if (e instanceof RuntimeException && e.getCause() instanceof AccumuloSecurityException) { throw (AccumuloSecurityException) e.getCause(); } - + log.info(e.getMessage() + " ... retrying ..."); UtilWaitThread.sleep(3000); } } - + ArrayList<Text> endRows = new ArrayList<Text>(tabletLocations.size()); - + for (KeyExtent ke : tabletLocations.keySet()) if (ke.getEndRow() != null) endRows.add(ke.getEndRow()); - + return endRows; } - + @Deprecated @Override public Collection<Text> getSplits(String tableName) throws TableNotFoundException { @@ -608,7 +608,7 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new RuntimeException(e); } } - + /** * @param tableName * the name of the table @@ -620,15 +620,15 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public Collection<Text> listSplits(String tableName, int maxSplits) throws TableNotFoundException, AccumuloSecurityException { Collection<Text> endRows = listSplits(tableName); - + if (endRows.size() <= maxSplits) return endRows; - + double r = (maxSplits + 1) / (double) (endRows.size()); double pos = 0; - + ArrayList<Text> subset = new ArrayList<Text>(maxSplits); - + int j = 0; for (int i = 0; i < endRows.size() && j < maxSplits; i++) { pos += r; @@ -638,10 +638,10 @@ public class TableOperationsImpl extends TableOperationsHelper { pos -= 1; } } - + return subset; } - + @Deprecated @Override public Collection<Text> getSplits(String tableName, int maxSplits) throws TableNotFoundException { @@ -651,7 +651,7 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new RuntimeException(e); } } - + /** * Delete a table * @@ -667,51 +667,51 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { ArgumentChecker.notNull(tableName); - + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes())); Map<String,String> opts = new HashMap<String,String>(); - + try { doTableOperation(TableOperation.DELETE, args, opts); } catch (TableExistsException e) { // should not happen throw new RuntimeException(e); } - + } - + @Override public void clone(String srcTableName, String newTableName, boolean flush, Map<String,String> propertiesToSet, Set<String> propertiesToExclude) throws AccumuloSecurityException, TableNotFoundException, AccumuloException, TableExistsException { - + ArgumentChecker.notNull(srcTableName, newTableName); - + String namespace = Tables.extractNamespace(newTableName); if (!namespaceExists(namespace)) { String info = "Table namespace not found while cloning table"; throw new IllegalArgumentException(new TableNamespaceNotFoundException(null, namespace, info)); } - + String srcTableId = Tables.getTableId(instance, srcTableName); - + if (flush) _flush(srcTableId, null, null, true); - + if (propertiesToExclude == null) propertiesToExclude = Collections.emptySet(); - + if (propertiesToSet == null) propertiesToSet = Collections.emptyMap(); - + // TODO ACCUMULO-1565 needs to be fixed before the commented-out code below will work. - HashSet<String> excludeProps = new HashSet<String>();//getUniqueNamespaceProperties(namespace, srcTableName); + HashSet<String> excludeProps = new HashSet<String>();// getUniqueNamespaceProperties(namespace, srcTableName); for (String p : propertiesToExclude) { excludeProps.add(p); } - + if (!Collections.disjoint(excludeProps, propertiesToSet.keySet())) throw new IllegalArgumentException("propertiesToSet and propertiesToExclude not disjoint"); - + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(srcTableId.getBytes()), ByteBuffer.wrap(newTableName.getBytes())); Map<String,String> opts = new HashMap<String,String>(); for (Entry<String,String> entry : propertiesToSet.entrySet()) { @@ -719,14 +719,14 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new IllegalArgumentException("Property can not start with " + CLONE_EXCLUDE_PREFIX); opts.put(entry.getKey(), entry.getValue()); } - + for (String prop : propertiesToExclude) { opts.put(CLONE_EXCLUDE_PREFIX + prop, ""); } - + doTableOperation(TableOperation.CLONE, args, opts); } - + // get the properties that are only in the table namespace so that we can exclude them when copying table properties // TODO ACCUMULO-1565 needs fixed first private HashSet<String> getUniqueNamespaceProperties(String namespace, String table) throws TableNotFoundException, AccumuloException { @@ -749,7 +749,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } return props; } - + /** * Rename a table * @@ -769,18 +769,18 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public void rename(String oldTableName, String newTableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException, TableExistsException { - + String namespace = Tables.extractNamespace(newTableName); if (!namespaceExists(namespace)) { String info = "Table namespace not found while renaming table"; throw new IllegalArgumentException(new TableNamespaceNotFoundException(null, namespace, info)); } - + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes()), ByteBuffer.wrap(newTableName.getBytes())); Map<String,String> opts = new HashMap<String,String>(); doTableOperation(TableOperation.RENAME, args, opts); } - + /** * @deprecated since 1.4 {@link #flush(String, Text, Text, boolean)} */ @@ -793,7 +793,7 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new AccumuloException(e.getMessage(), e); } } - + /** * Flush a table * @@ -808,31 +808,31 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { ArgumentChecker.notNull(tableName); - + String tableId = Tables.getTableId(instance, tableName); _flush(tableId, start, end, wait); } - + @Override public void compact(String tableName, Text start, Text end, boolean flush, boolean wait) throws AccumuloSecurityException, TableNotFoundException, AccumuloException { compact(tableName, start, end, new ArrayList<IteratorSetting>(), flush, wait); } - + @Override public void compact(String tableName, Text start, Text end, List<IteratorSetting> iterators, boolean flush, boolean wait) throws AccumuloSecurityException, TableNotFoundException, AccumuloException { ArgumentChecker.notNull(tableName); ByteBuffer EMPTY = ByteBuffer.allocate(0); - + String tableId = Tables.getTableId(instance, tableName); - + if (flush) _flush(tableId, start, end, true); - + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY : TextUtil.getByteBuffer(end), ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(iterators))); - + Map<String,String> opts = new HashMap<String,String>(); try { doTableOperation(TableOperation.COMPACT, args, opts, wait); @@ -841,13 +841,13 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new RuntimeException(e); } } - + @Override public void cancelCompaction(String tableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException { String tableId = Tables.getTableId(instance, tableName); - + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes())); - + Map<String,String> opts = new HashMap<String,String>(); try { doTableOperation(TableOperation.COMPACT_CANCEL, args, opts, true); @@ -855,17 +855,17 @@ public class TableOperationsImpl extends TableOperationsHelper { // should not happen throw new RuntimeException(e); } - + } - + private void _flush(String tableId, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + try { long flushID; - + // used to pass the table name. but the tableid associated with a table name could change between calls. // so pass the tableid to both calls - + while (true) { MasterClientService.Iface client = null; try { @@ -879,7 +879,7 @@ public class TableOperationsImpl extends TableOperationsHelper { MasterClient.close(client); } } - + while (true) { MasterClientService.Iface client = null; try { @@ -912,7 +912,7 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new AccumuloException(e); } } - + /** * Sets a property on a table * @@ -937,7 +937,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } }); } - + /** * Removes a property from a table * @@ -960,7 +960,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } }); } - + /** * Gets properties of a table * @@ -993,9 +993,9 @@ public class TableOperationsImpl extends TableOperationsHelper { } catch (Exception e) { throw new AccumuloException(e); } - + } - + /** * Sets a tables locality groups. A tables locality groups can be changed at any time. * @@ -1015,22 +1015,22 @@ public class TableOperationsImpl extends TableOperationsHelper { // ensure locality groups do not overlap HashSet<Text> all = new HashSet<Text>(); for (Entry<String,Set<Text>> entry : groups.entrySet()) { - + if (!Collections.disjoint(all, entry.getValue())) { throw new IllegalArgumentException("Group " + entry.getKey() + " overlaps with another group"); } - + all.addAll(entry.getValue()); } - + for (Entry<String,Set<Text>> entry : groups.entrySet()) { Set<Text> colFams = entry.getValue(); String value = LocalityGroupUtil.encodeColumnFamilies(colFams); setProperty(tableName, Property.TABLE_LOCALITY_GROUP_PREFIX + entry.getKey(), value); } - + setProperty(tableName, Property.TABLE_LOCALITY_GROUPS.getKey(), StringUtil.join(groups.keySet(), ",")); - + // remove anything extraneous String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey(); for (Entry<String,String> entry : getProperties(tableName)) { @@ -1040,14 +1040,14 @@ public class TableOperationsImpl extends TableOperationsHelper { // one: String[] parts = property.split("\\."); String group = parts[parts.length - 1]; - + if (!groups.containsKey(group)) { removeProperty(tableName, property); } } } } - + /** * * Gets the locality groups currently set for a table. @@ -1064,22 +1064,22 @@ public class TableOperationsImpl extends TableOperationsHelper { public Map<String,Set<Text>> getLocalityGroups(String tableName) throws AccumuloException, TableNotFoundException { AccumuloConfiguration conf = new ConfigurationCopy(this.getProperties(tableName)); Map<String,Set<ByteSequence>> groups = LocalityGroupUtil.getLocalityGroups(conf); - + Map<String,Set<Text>> groups2 = new HashMap<String,Set<Text>>(); for (Entry<String,Set<ByteSequence>> entry : groups.entrySet()) { - + HashSet<Text> colFams = new HashSet<Text>(); - + for (ByteSequence bs : entry.getValue()) { colFams.add(new Text(bs.toArray())); } - + groups2.put(entry.getKey(), colFams); } - + return groups2; } - + /** * @param tableName * the name of the table @@ -1103,7 +1103,7 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new IllegalArgumentException("maximum splits must be >= 1"); if (maxSplits == 1) return Collections.singleton(range); - + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); String tableId = Tables.getTableId(instance, tableName); TabletLocator tl = TabletLocator.getLocator(instance, new Text(tableId)); @@ -1114,24 +1114,24 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new TableDeletedException(tableId); if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) throw new TableOfflineException(instance, tableId); - + log.warn("Unable to locate bins for specified range. Retrying."); // sleep randomly between 100 and 200ms UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); binnedRanges.clear(); tl.invalidateCache(); } - + // group key extents to get <= maxSplits LinkedList<KeyExtent> unmergedExtents = new LinkedList<KeyExtent>(); List<KeyExtent> mergedExtents = new ArrayList<KeyExtent>(); - + for (Map<KeyExtent,List<Range>> map : binnedRanges.values()) unmergedExtents.addAll(map.keySet()); - + // the sort method is efficient for linked list Collections.sort(unmergedExtents); - + while (unmergedExtents.size() + mergedExtents.size() > maxSplits) { if (unmergedExtents.size() >= 2) { KeyExtent first = unmergedExtents.removeFirst(); @@ -1144,15 +1144,15 @@ public class TableOperationsImpl extends TableOperationsHelper { unmergedExtents.addAll(mergedExtents); mergedExtents.clear(); } - + } - + mergedExtents.addAll(unmergedExtents); - + Set<Range> ranges = new HashSet<Range>(); for (KeyExtent k : mergedExtents) ranges.add(k.toDataRange().clip(range)); - + return ranges; } @@ -1191,14 +1191,14 @@ public class TableOperationsImpl extends TableOperationsHelper { public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloSecurityException, TableNotFoundException, AccumuloException { ArgumentChecker.notNull(tableName, dir, failureDir); - + Path dirPath = checkPath(dir, "Bulk", ""); Path failPath = checkPath(failureDir, "Bulk", "failure"); List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(dirPath.toString().getBytes()), ByteBuffer.wrap(failPath.toString().getBytes()), ByteBuffer.wrap((setTime + "").getBytes())); Map<String,String> opts = new HashMap<String,String>(); - + try { doTableOperation(TableOperation.BULK_IMPORT, args, opts); } catch (TableExistsException e) { @@ -1208,10 +1208,10 @@ public class TableOperationsImpl extends TableOperationsHelper { // return new BulkImportHelper(instance, credentials, tableName).importDirectory(new Path(dir), new Path(failureDir), numThreads, numAssignThreads, // disableGC); } - + private void waitForTableStateTransition(String tableId, TableState expectedState) throws AccumuloException, TableNotFoundException, AccumuloSecurityException { - + Text startRow = null; Text lastRow = null; @@ -1225,12 +1225,13 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new AccumuloException("Unexpected table state " + tableId + " " + Tables.getTableState(instance, tableId) + " != " + expectedState); } } - + Range range = new KeyExtent(new Text(tableId), null, null).toMetadataRange(); - if (startRow == null || lastRow == null) + if (startRow == null || lastRow == null) range = new KeyExtent(new Text(tableId), null, null).toMetadataRange(); else range = new Range(startRow, lastRow); + String metaTable = MetadataTable.NAME; if (tableId.equals(MetadataTable.ID)) metaTable = RootTable.NAME; @@ -1239,83 +1240,84 @@ public class TableOperationsImpl extends TableOperationsHelper { TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); scanner.setRange(range); - + RowIterator rowIter = new RowIterator(scanner); - + KeyExtent lastExtent = null; - + int total = 0; int waitFor = 0; int holes = 0; Text continueRow = null; MapCounter<String> serverCounts = new MapCounter<String>(); - + while (rowIter.hasNext()) { Iterator<Entry<Key,Value>> row = rowIter.next(); - + total++; KeyExtent extent = null; String future = null; String current = null; - + while (row.hasNext()) { Entry<Key,Value> entry = row.next(); Key key = entry.getKey(); - + if (key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) future = entry.getValue().toString(); - + if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)) current = entry.getValue().toString(); - + if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) extent = new KeyExtent(key.getRow(), entry.getValue()); } - + if ((expectedState == TableState.ONLINE && current == null) || (expectedState == TableState.OFFLINE && (future != null || current != null))) { if (continueRow == null) continueRow = extent.getMetadataEntry(); waitFor++; lastRow = extent.getMetadataEntry(); - - if(current != null) + + if (current != null) serverCounts.increment(current, 1); - if(future != null) + if (future != null) serverCounts.increment(future, 1); } - + if (!extent.getTableId().toString().equals(tableId)) { throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); } - + if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { holes++; } - + lastExtent = extent; } - + if (continueRow != null) { startRow = continueRow; } - + if (holes > 0 || total == 0) { startRow = null; lastRow = null; } - + if (waitFor > 0 || holes > 0 || total == 0) { long waitTime; long maxPerServer = 0; - if(serverCounts.size() > 0){ + if (serverCounts.size() > 0) { maxPerServer = Collections.max(serverCounts.values()); waitTime = maxPerServer * 10; - }else + } else waitTime = waitFor * 10; waitTime = Math.max(100, waitTime); waitTime = Math.min(5000, waitTime); - log.trace("Waiting for " + waitFor + "("+maxPerServer+") tablets, startRow = " + startRow + " lastRow = "+lastRow+", holes=" + holes+" sleeping:"+waitTime+"ms"); + log.trace("Waiting for " + waitFor + "(" + maxPerServer + ") tablets, startRow = " + startRow + " lastRow = " + lastRow + ", holes=" + holes + + " sleeping:" + waitTime + "ms"); UtilWaitThread.sleep(waitTime); } else { break; @@ -1323,11 +1325,12 @@ public class TableOperationsImpl extends TableOperationsHelper { } } + @Override public void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { offline(tableName, false); } - + /** * * @param tableName @@ -1340,28 +1343,28 @@ public class TableOperationsImpl extends TableOperationsHelper { */ @Override public void offline(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { - + ArgumentChecker.notNull(tableName); String tableId = Tables.getTableId(instance, tableName); List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes())); Map<String,String> opts = new HashMap<String,String>(); - + try { doTableOperation(TableOperation.OFFLINE, args, opts); } catch (TableExistsException e) { // should not happen throw new RuntimeException(e); } - - if(wait) + + if (wait) waitForTableStateTransition(tableId, TableState.OFFLINE); } - + @Override public void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { online(tableName, false); } - + /** * * @param tableName @@ -1378,18 +1381,18 @@ public class TableOperationsImpl extends TableOperationsHelper { String tableId = Tables.getTableId(instance, tableName); List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes())); Map<String,String> opts = new HashMap<String,String>(); - + try { doTableOperation(TableOperation.ONLINE, args, opts); } catch (TableExistsException e) { // should not happen throw new RuntimeException(e); } - - if(wait) + + if (wait) waitForTableStateTransition(tableId, TableState.ONLINE); } - + /** * Clears the tablet locator cache for a specified table * @@ -1404,7 +1407,7 @@ public class TableOperationsImpl extends TableOperationsHelper { TabletLocator tabLocator = TabletLocator.getLocator(instance, new Text(Tables.getTableId(instance, tableName))); tabLocator.invalidateCache(); } - + /** * Get a mapping of table name to internal table id. * @@ -1414,7 +1417,7 @@ public class TableOperationsImpl extends TableOperationsHelper { public Map<String,String> tableIdMap() { return Tables.getNameToIdMap(instance); } - + @Override public Text getMaxRow(String tableName, Authorizations auths, Text startRow, boolean startInclusive, Text endRow, boolean endInclusive) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { @@ -1422,10 +1425,10 @@ public class TableOperationsImpl extends TableOperationsHelper { Scanner scanner = instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName, auths); return FindMax.findMax(scanner, startRow, startInclusive, endRow, endInclusive); } - + @Override public List<DiskUsage> getDiskUsage(Set<String> tableNames) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + List<TDiskUsage> diskUsages = null; while (diskUsages == null) { Pair<String,Client> pair = null; @@ -1457,18 +1460,18 @@ public class TableOperationsImpl extends TableOperationsHelper { ServerClient.close(pair.getSecond()); } } - + List<DiskUsage> finalUsages = new ArrayList<DiskUsage>(); for (TDiskUsage diskUsage : diskUsages) { finalUsages.add(new DiskUsage(new TreeSet<String>(diskUsage.getTables()), diskUsage.getUsage())); } - + return finalUsages; } - + public static Map<String,String> getExportedProps(FileSystem fs, Path path) throws IOException { HashMap<String,String> props = new HashMap<String,String>(); - + ZipInputStream zis = new ZipInputStream(fs.open(path)); try { ZipEntry zipEntry; @@ -1480,7 +1483,7 @@ public class TableOperationsImpl extends TableOperationsHelper { String sa[] = line.split("=", 2); props.put(sa[0], sa[1]); } - + break; } } @@ -1489,11 +1492,11 @@ public class TableOperationsImpl extends TableOperationsHelper { } return props; } - + @Override public void importTable(String tableName, String importDir) throws TableExistsException, AccumuloException, AccumuloSecurityException { ArgumentChecker.notNull(tableName, importDir); - + try { importDir = checkPath(importDir, "Table", "").toString(); } catch (IOException e) { @@ -1503,45 +1506,45 @@ public class TableOperationsImpl extends TableOperationsHelper { try { FileSystem fs = new Path(importDir).getFileSystem(CachedConfiguration.getInstance()); Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE)); - + for (String propKey : props.keySet()) { if (Property.isClassProperty(propKey) && !props.get(propKey).contains(Constants.CORE_PACKAGE_NAME)) { Logger.getLogger(this.getClass()).info( "Imported table sets '" + propKey + "' to '" + props.get(propKey) + "'. Ensure this class is on Accumulo classpath."); } } - + } catch (IOException ioe) { Logger.getLogger(this.getClass()).warn("Failed to check if imported table references external java classes : " + ioe.getMessage()); } - + String namespace = Tables.extractNamespace(tableName); if (!namespaceExists(namespace)) { String info = "Table namespace not found while importing to table"; throw new RuntimeException(new TableNamespaceNotFoundException(null, namespace, info)); } - + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(importDir.getBytes())); - + Map<String,String> opts = Collections.emptyMap(); - + try { doTableOperation(TableOperation.IMPORT, args, opts); } catch (TableNotFoundException e1) { // should not happen throw new RuntimeException(e1); } - + } - + @Override public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { ArgumentChecker.notNull(tableName, exportDir); - + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(exportDir.getBytes())); - + Map<String,String> opts = Collections.emptyMap(); - + try { doTableOperation(TableOperation.EXPORT, args, opts); } catch (TableExistsException e1) { @@ -1549,12 +1552,12 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new RuntimeException(e1); } } - + @Override public boolean testClassLoad(final String tableName, final String className, final String asTypeName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { ArgumentChecker.notNull(tableName, className, asTypeName); - + try { return ServerClient.executeRaw(instance, new ClientExecReturn<Boolean,ClientService.Client>() { @Override @@ -1578,20 +1581,20 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new AccumuloException(e); } } - + @Override public void attachIterator(String tableName, IteratorSetting setting, EnumSet<IteratorScope> scopes) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { testClassLoad(tableName, setting.getIteratorClass(), SortedKeyValueIterator.class.getName()); super.attachIterator(tableName, setting, scopes); } - + @Override public int addConstraint(String tableName, String constraintClassName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { testClassLoad(tableName, constraintClassName, Constraint.class.getName()); return super.addConstraint(tableName, constraintClassName); } - + private boolean namespaceExists(String namespace) { return TableNamespaces.getNameToIdMap(instance).containsKey(namespace); }