Author: vines Date: Fri Jan 25 05:28:53 2013 New Revision: 1438328 URL: http://svn.apache.org/viewvc?rev=1438328&view=rev Log: ACCUMULO-259 Merging up to 1438309
Added: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteShellterCommand.java - copied unchanged from r1438327, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteShellterCommand.java accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListShellIterCommand.java - copied unchanged from r1438327, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListShellIterCommand.java accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetShellIterCommand.java - copied unchanged from r1438327, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetShellIterCommand.java Modified: accumulo/branches/ACCUMULO-259/ (props changed) accumulo/branches/ACCUMULO-259/assemble/ (props changed) accumulo/branches/ACCUMULO-259/core/ (props changed) accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (props changed) accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (props changed) accumulo/branches/ACCUMULO-259/packages/ (props changed) accumulo/branches/ACCUMULO-259/server/ (props changed) accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java accumulo/branches/ACCUMULO-259/src/ (props changed) Propchange: accumulo/branches/ACCUMULO-259/ ------------------------------------------------------------------------------ Merged /accumulo/trunk:r1438310-1438327 Propchange: accumulo/branches/ACCUMULO-259/assemble/ ------------------------------------------------------------------------------ Merged /accumulo/trunk/assemble:r1438310-1438327 Propchange: accumulo/branches/ACCUMULO-259/core/ ------------------------------------------------------------------------------ Merged /accumulo/trunk/core:r1438310-1438327 Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java Fri Jan 25 05:28:53 2013 @@ -82,6 +82,7 @@ import org.apache.accumulo.core.util.she import org.apache.accumulo.core.util.shell.commands.DeleteManyCommand; import org.apache.accumulo.core.util.shell.commands.DeleteRowsCommand; import org.apache.accumulo.core.util.shell.commands.DeleteScanIterCommand; +import org.apache.accumulo.core.util.shell.commands.DeleteShellterCommand; import org.apache.accumulo.core.util.shell.commands.DeleteTableCommand; import org.apache.accumulo.core.util.shell.commands.DeleteUserCommand; import org.apache.accumulo.core.util.shell.commands.DropTableCommand; @@ -108,6 +109,7 @@ import org.apache.accumulo.core.util.she import org.apache.accumulo.core.util.shell.commands.ListCompactionsCommand; import org.apache.accumulo.core.util.shell.commands.ListIterCommand; import org.apache.accumulo.core.util.shell.commands.ListScansCommand; +import org.apache.accumulo.core.util.shell.commands.ListShellIterCommand; import org.apache.accumulo.core.util.shell.commands.MaxRowCommand; import org.apache.accumulo.core.util.shell.commands.MergeCommand; import org.apache.accumulo.core.util.shell.commands.NoTableCommand; @@ -125,6 +127,7 @@ import org.apache.accumulo.core.util.she import org.apache.accumulo.core.util.shell.commands.SetGroupsCommand; import org.apache.accumulo.core.util.shell.commands.SetIterCommand; import org.apache.accumulo.core.util.shell.commands.SetScanIterCommand; +import org.apache.accumulo.core.util.shell.commands.SetShellIterCommand; import org.apache.accumulo.core.util.shell.commands.SleepCommand; import org.apache.accumulo.core.util.shell.commands.SystemPermissionsCommand; import org.apache.accumulo.core.util.shell.commands.TableCommand; @@ -169,6 +172,7 @@ public class Shell extends ShellOptions private Class<? extends Formatter> defaultFormatterClass = DefaultFormatter.class; private Class<? extends Formatter> binaryFormatterClass = BinaryFormatter.class; public Map<String,List<IteratorSetting>> scanIteratorOptions = new HashMap<String,List<IteratorSetting>>(); + public Map<String,List<IteratorSetting>> iteratorProfiles = new HashMap<String,List<IteratorSetting>>(); private Token rootToken; public final Map<String,Command> commandFactory = new TreeMap<String,Command>(); @@ -299,7 +303,8 @@ public class Shell extends ShellOptions Command[] execCommands = {new ExecfileCommand(), new HistoryCommand()}; Command[] exitCommands = {new ByeCommand(), new ExitCommand(), new QuitCommand()}; Command[] helpCommands = {new AboutCommand(), new HelpCommand(), new InfoCommand(), new QuestionCommand()}; - Command[] iteratorCommands = {new DeleteIterCommand(), new DeleteScanIterCommand(), new ListIterCommand(), new SetIterCommand(), new SetScanIterCommand()}; + Command[] iteratorCommands = {new DeleteIterCommand(), new DeleteScanIterCommand(), new ListIterCommand(), new SetIterCommand(), new SetScanIterCommand(), + new SetShellIterCommand(), new ListShellIterCommand(), new DeleteShellterCommand()}; Command[] otherCommands = {new HiddenCommand()}; Command[] permissionsCommands = {new GrantCommand(), new RevokeCommand(), new SystemPermissionsCommand(), new TablePermissionsCommand(), new UserPermissionsCommand()}; Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java Fri Jan 25 05:28:53 2013 @@ -16,8 +16,13 @@ */ package org.apache.accumulo.core.util.shell.commands; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.util.shell.Shell; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -25,10 +30,11 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.io.Text; public class CompactCommand extends TableOperation { - private Option noFlushOption, waitOpt; + private Option noFlushOption, waitOpt, profileOpt; private boolean flush; private Text startRow; private Text endRow; + private List<IteratorSetting> iterators; boolean override = false; private boolean wait; @@ -44,7 +50,8 @@ public class CompactCommand extends Tabl if (wait) { Shell.log.info("Compacting table ..."); } - shellState.getConnector().tableOperations().compact(tableName, startRow, endRow, flush, wait); + + shellState.getConnector().tableOperations().compact(tableName, startRow, endRow, iterators, flush, wait); Shell.log.info("Compaction of table " + tableName + " " + (wait ? "completed" : "started") + " for given range"); } catch (Exception ex) { @@ -59,6 +66,19 @@ public class CompactCommand extends Tabl endRow = OptUtil.getEndRow(cl); wait = cl.hasOption(waitOpt.getOpt()); + if (cl.hasOption(profileOpt.getOpt())) { + List<IteratorSetting> iterators = shellState.iteratorProfiles.get(cl.getOptionValue(profileOpt.getOpt())); + if (iterators == null) { + Shell.log.error("Profile " + cl.getOptionValue(profileOpt.getOpt()) + " does not exist"); + return -1; + } + + this.iterators = new ArrayList<IteratorSetting>(iterators); + } else { + this.iterators = Collections.emptyList(); + } + + return super.execute(fullCommand, cl, shellState); } @@ -73,6 +93,10 @@ public class CompactCommand extends Tabl waitOpt = new Option("w", "wait", false, "wait for compact to finish"); opts.addOption(waitOpt); + profileOpt = new Option("pn", "profile", true, "iterator profile name"); + profileOpt.setArgName("profile"); + opts.addOption(profileOpt); + return opts; } } Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java Fri Jan 25 05:28:53 2013 @@ -47,7 +47,7 @@ public class DeleteManyCommand extends S scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, "NOVALUE", SortedKeyIterator.class)); // handle session-specific scan iterators - addScanIterators(shellState, scanner, tableName); + addScanIterators(shellState, cl, scanner, tableName); // handle remaining optional arguments scanner.setRange(getRange(cl, interpeter)); Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java Fri Jan 25 05:28:53 2013 @@ -79,7 +79,7 @@ public class ListIterCommand extends Com } public String description() { - return "lists table-specific iterators"; + return "lists table-specific iterators configured in this shell session"; } @Override Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java Fri Jan 25 05:28:53 2013 @@ -55,6 +55,7 @@ public class ScanCommand extends Command private Option optStartRowExclusive; private Option optEndRowExclusive; private Option timeoutOption; + private Option profileOpt; public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { @@ -72,7 +73,7 @@ public class ScanCommand extends Command final Scanner scanner = shellState.getConnector().createScanner(tableName, auths); // handle session-specific scan iterators - addScanIterators(shellState, scanner, tableName); + addScanIterators(shellState, cl, scanner, tableName); // handle remaining optional arguments scanner.setRange(getRange(cl, interpeter)); @@ -117,12 +118,24 @@ public class ScanCommand extends Command return Long.MAX_VALUE; } - protected void addScanIterators(final Shell shellState, final Scanner scanner, final String tableName) { - final List<IteratorSetting> tableScanIterators = shellState.scanIteratorOptions.get(shellState.getTableName()); - if (tableScanIterators == null) { - Shell.log.debug("Found no scan iterators to set"); - return; + protected void addScanIterators(final Shell shellState, CommandLine cl, final Scanner scanner, final String tableName) { + + List<IteratorSetting> tableScanIterators; + if (cl.hasOption(profileOpt.getOpt())) { + String profile = cl.getOptionValue(profileOpt.getOpt()); + tableScanIterators = shellState.iteratorProfiles.get(profile); + + if (tableScanIterators == null) { + throw new IllegalArgumentException("Profile " + profile + " does not exist"); + } + } else { + tableScanIterators = shellState.scanIteratorOptions.get(shellState.getTableName()); + if (tableScanIterators == null) { + Shell.log.debug("Found no scan iterators to set"); + return; + } } + Shell.log.debug("Found " + tableScanIterators.size() + " scan iterators to set"); for (IteratorSetting setting : tableScanIterators) { @@ -287,6 +300,9 @@ public class ScanCommand extends Command timeoutOption.setArgName("timeout"); outputFileOpt.setArgName("file"); + profileOpt = new Option("pn", "profile", true, "iterator profile name"); + profileOpt.setArgName("profile"); + o.addOption(scanOptAuths); o.addOption(scanOptRow); o.addOption(OptUtil.startRowOpt()); @@ -303,6 +319,7 @@ public class ScanCommand extends Command o.addOption(formatterInterpeterOpt); o.addOption(timeoutOption); o.addOption(outputFileOpt); + o.addOption(profileOpt); return o; } Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java Fri Jan 25 05:28:53 2013 @@ -57,7 +57,6 @@ public class SetIterCommand extends Comm public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, ShellCommandException { - final String tableName = OptUtil.getTableOpt(cl, shellState); final int priority = Integer.parseInt(cl.getOptionValue(priorityOpt.getOpt())); @@ -87,14 +86,18 @@ public class SetIterCommand extends Comm throw new ShellCommandException(ErrorCode.INITIALIZATION_FAILURE, "Servers are unable to load " + aggregatorClass + " as type " + Aggregator.class.getName()); } - setTableProperties(cl, shellState, tableName, priority, options, classname, name); + setTableProperties(cl, shellState, priority, options, classname, name); return 0; } - protected void setTableProperties(final CommandLine cl, final Shell shellState, final String tableName, final int priority, final Map<String,String> options, final String classname, final String name) + protected void setTableProperties(final CommandLine cl, final Shell shellState, final int priority, final Map<String,String> options, final String classname, + final String name) throws AccumuloException, AccumuloSecurityException, ShellCommandException, TableNotFoundException { // remove empty values + + final String tableName = OptUtil.getTableOpt(cl, shellState); + for (Iterator<Entry<String,String>> i = options.entrySet().iterator(); i.hasNext();) { final Entry<String,String> entry = i.next(); if (entry.getValue() == null || entry.getValue().isEmpty()) { Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java Fri Jan 25 05:28:53 2013 @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -43,17 +44,29 @@ public class SetScanIterCommand extends @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, ShellCommandException { + Shell.log.warn("Deprecated, use " + new SetShellIterCommand().getName()); return super.execute(fullCommand, cl, shellState); } @Override - protected void setTableProperties(final CommandLine cl, final Shell shellState, final String tableName, final int priority, final Map<String,String> options, final String classname, final String name) - throws AccumuloException, AccumuloSecurityException, ShellCommandException, TableNotFoundException { + protected void setTableProperties(final CommandLine cl, final Shell shellState, final int priority, final Map<String,String> options, final String classname, + final String name) throws AccumuloException, AccumuloSecurityException, ShellCommandException, TableNotFoundException { + + final String tableName = OptUtil.getTableOpt(cl, shellState); + // instead of setting table properties, just put the options in a list to use at scan time if (!shellState.getConnector().instanceOperations().testClassLoad(classname, SortedKeyValueIterator.class.getName())) { throw new ShellCommandException(ErrorCode.INITIALIZATION_FAILURE, "Servers are unable to load " + classname + " as type " + SortedKeyValueIterator.class.getName()); } + + for (Iterator<Entry<String,String>> i = options.entrySet().iterator(); i.hasNext();) { + final Entry<String,String> entry = i.next(); + if (entry.getValue() == null || entry.getValue().isEmpty()) { + i.remove(); + } + } + List<IteratorSetting> tableScanIterators = shellState.scanIteratorOptions.get(tableName); if (tableScanIterators == null) { tableScanIterators = new ArrayList<IteratorSetting>(); Propchange: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java ------------------------------------------------------------------------------ Merged /accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1438310-1438327 Propchange: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java ------------------------------------------------------------------------------ Merged /accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1438310-1438327 Propchange: accumulo/branches/ACCUMULO-259/packages/ ------------------------------------------------------------------------------ Merged /accumulo/trunk/packages:r1438310-1438327 Propchange: accumulo/branches/ACCUMULO-259/server/ ------------------------------------------------------------------------------ Merged /accumulo/trunk/server:r1438310-1438327 Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java (original) +++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java Fri Jan 25 05:28:53 2013 @@ -217,14 +217,13 @@ public class CompactRange extends Master long flushID = Long.parseLong(new String(tokens[0])); flushID++; + if (tokens.length > 1) { + throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, + "Another compaction with iterators is running"); + } + String txidString = String.format("%016x", tid); StringBuilder encodedIterators = new StringBuilder(); - for (int i = 1; i < tokens.length; i++) { - if (tokens[i].startsWith(txidString)) - continue; // skip self - encodedIterators.append(","); - encodedIterators.append(tokens[i]); - } if (iterators != null && iterators.getIterators().size() > 0) { Hex hex = new Hex(); Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1438328&r1=1438327&r2=1438328&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java (original) +++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java Fri Jan 25 05:28:53 2013 @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.cloudtrace.instrument.Span; import org.apache.accumulo.cloudtrace.instrument.Trace; @@ -38,11 +39,12 @@ import org.apache.accumulo.core.data.thr import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; -import org.apache.accumulo.core.iterators.system.CountingIterator; import org.apache.accumulo.core.iterators.system.DeletingIterator; import org.apache.accumulo.core.iterators.system.MultiIterator; import org.apache.accumulo.core.iterators.system.TimeSettingIterator; @@ -67,6 +69,43 @@ import org.apache.log4j.Logger; public class Compactor implements Callable<CompactionStats> { + public class CountingIterator extends WrappingIterator { + + private long count; + + public CountingIterator deepCopy(IteratorEnvironment env) { + return new CountingIterator(this, env); + } + + private CountingIterator(CountingIterator other, IteratorEnvironment env) { + setSource(other.getSource().deepCopy(env)); + count = 0; + } + + public CountingIterator(SortedKeyValueIterator<Key,Value> source) { + this.setSource(source); + count = 0; + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + public void next() throws IOException { + super.next(); + count++; + if (count % 1024 == 0) { + entriesRead.addAndGet(1024); + } + } + + public long getCount() { + return count; + } + } + private static final Logger log = Logger.getLogger(Compactor.class); static class CompactionCanceledException extends Exception { @@ -93,37 +132,22 @@ public class Compactor implements Callab // things to report private String currentLocalityGroup = ""; private long startTime; - private long currentEntriesRead = 0; - private long currentEntriesWritten = 0; - private long totalEntriesRead = 0; - private long totalEntriesWritten = 0; + private MajorCompactionReason reason; protected MinorCompactionReason mincReason; - private synchronized void updateStats(long read, long written) { - this.currentEntriesRead = read; - this.currentEntriesWritten = written; - } - - private synchronized void clearStats() { - totalEntriesRead = 0; - totalEntriesWritten = 0; - currentEntriesRead = 0; - currentEntriesWritten = 0; - currentLocalityGroup = ""; - } - - private synchronized void rollStats() { - this.totalEntriesRead = currentEntriesRead; - this.totalEntriesWritten = currentEntriesWritten; - currentEntriesRead = 0; - currentEntriesWritten = 0; - } + private AtomicLong entriesRead = new AtomicLong(0); + private AtomicLong entriesWritten = new AtomicLong(0); private synchronized void setLocalityGroup(String name) { this.currentLocalityGroup = name; } + private void clearStats() { + entriesRead.set(0); + entriesWritten.set(0); + } + protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>()); public static class CompactionInfo { @@ -134,13 +158,9 @@ public class Compactor implements Callab private long entriesWritten; CompactionInfo(Compactor compactor) { - // get a consistent snapshot of changing stats - synchronized (compactor) { - this.localityGroup = compactor.currentLocalityGroup; - this.entriesRead = compactor.totalEntriesRead + compactor.currentEntriesRead; - this.entriesWritten = compactor.totalEntriesWritten + compactor.currentEntriesWritten; - } - + this.localityGroup = compactor.currentLocalityGroup; + this.entriesRead = compactor.entriesRead.get(); + this.entriesWritten = compactor.entriesWritten.get(); this.compactor = compactor; } @@ -334,7 +354,7 @@ public class Compactor implements Callab } } } - + private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException { List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size()); @@ -429,12 +449,10 @@ public class Compactor implements Callab entriesCompacted++; if (entriesCompacted % 1024 == 0) { - // Periodically update stats, do not want to do this too often since its syncronized - updateStats(citr.getCount(), entriesCompacted); + // Periodically update stats, do not want to do this too often since its volatile + entriesWritten.addAndGet(1024); } } - - rollStats(); if (itr.hasTop() && !env.isCompactionEnabled()) { // cancel major compaction operation Propchange: accumulo/branches/ACCUMULO-259/src/ ------------------------------------------------------------------------------ Merged /accumulo/trunk/src:r1438310-1438327