http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java index 08bff26..e29d30c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java @@ -17,9 +17,20 @@ package org.apache.accumulo.tserver.compaction; import java.io.IOException; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.function.Predicate; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mapred.AccumuloFileOutputFormat; +import org.apache.accumulo.core.client.rfile.RFile.WriterOptions; +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.Summarizer.Combiner; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.client.summary.Summary.FileStatistics; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TabletId; @@ -27,11 +38,21 @@ import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.data.impl.TabletIdImpl; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.summary.SummaryReader; +import org.apache.accumulo.core.summary.Gatherer; +import org.apache.accumulo.core.summary.SummarizerFactory; +import org.apache.accumulo.core.summary.SummaryCollection; +import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.tserver.compaction.strategies.TooManyDeletesCompactionStrategy; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import com.google.common.base.Preconditions; + /** * Information that can be used to determine how a tablet is to be major compacted, if needed. */ @@ -40,18 +61,27 @@ public class MajorCompactionRequest implements Cloneable { final private MajorCompactionReason reason; final private VolumeManager volumeManager; final private AccumuloConfiguration tableConfig; + final private BlockCache indexCache; + final private BlockCache summaryCache; private Map<FileRef,DataFileValue> files; - public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason, VolumeManager manager, AccumuloConfiguration tabletConfig) { + public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason, VolumeManager manager, AccumuloConfiguration tabletConfig, + BlockCache summaryCache, BlockCache indexCache) { this.extent = extent; this.reason = reason; this.volumeManager = manager; this.tableConfig = tabletConfig; this.files = Collections.emptyMap(); + this.summaryCache = summaryCache; + this.indexCache = indexCache; + } + + public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason, AccumuloConfiguration tabletConfig) { + this(extent, reason, null, tabletConfig, null, null); } public MajorCompactionRequest(MajorCompactionRequest mcr) { - this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig); + this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig, mcr.summaryCache, mcr.indexCache); // know this is already unmodifiable, no need to wrap again this.files = mcr.files; } @@ -68,11 +98,61 @@ public class MajorCompactionRequest implements Cloneable { return files; } + /** + * Returns all summaries present in each file. + * + * <p> + * This method can only be called from {@link CompactionStrategy#gatherInformation(MajorCompactionRequest)}. Unfortunately, {@code gatherInformation()} is not + * called before {@link CompactionStrategy#shouldCompact(MajorCompactionRequest)}. Therefore {@code shouldCompact()) should just return true when a compactions strategy + * wants to use summary information. + * + * <p> + * When using summaries to make compaction decisions, its important to ensure that all summary data fits in the tablet server summary cache. The size of this + * cache is configured by code tserver.cache.summary.size}. Also its important to use the summarySelector predicate to only retrieve the needed summary data. + * Otherwise uneeded summary data could be brought into the cache. + * + * <p> + * Some files may contain data outside of a tablets range. When {@link Summarizer}'s generate small amounts of summary data, multiple summaries may be stored + * within a file for different row ranges. This will allow more accurate summaries to be returned for the case where a file has data outside a tablets range. + * However, some summary data outside of the tablets range may still be included. When this happens {@link FileStatistics#getExtra()} will be non zero. Also, + * its good to be aware of the other potential causes of inaccuracies {@link FileStatistics#getInaccurate()} + * + * <p> + * When this method is called with multiple files, it will automatically merge summary data using {@link Combiner#merge(Map, Map)}. If summary information is + * needed for each file, then just call this method for each file. + * + * <p> + * Writing a compaction strategy that uses summary information is a bit tricky. See the source code for {@link TooManyDeletesCompactionStrategy} as an example + * of a compaction strategy. + * + * @see Summarizer + * @see TableOperations#addSummarizers(String, SummarizerConfiguration...) + * @see AccumuloFileOutputFormat#setSummarizers(org.apache.hadoop.mapred.JobConf, SummarizerConfiguration...) + * @see WriterOptions#withSummarizers(SummarizerConfiguration...) + */ + public List<Summary> getSummaries(Collection<FileRef> files, Predicate<SummarizerConfiguration> summarySelector) throws IOException { + Preconditions.checkState(volumeManager != null, + "Getting summaries is not supported at this time. Its only supported when CompactionStrategy.gatherInformation() is called."); + SummaryCollection sc = new SummaryCollection(); + SummarizerFactory factory = new SummarizerFactory(tableConfig); + for (FileRef file : files) { + FileSystem fs = volumeManager.getVolumeByPath(file.path()).getFileSystem(); + Configuration conf = CachedConfiguration.getInstance(); + SummaryCollection fsc = SummaryReader.load(fs, conf, tableConfig, factory, file.path(), summarySelector, summaryCache, indexCache).getSummaries( + Collections.singletonList(new Gatherer.RowRange(extent))); + sc.merge(fsc, factory); + } + + return sc.getSummaries(); + } + public void setFiles(Map<FileRef,DataFileValue> update) { this.files = Collections.unmodifiableMap(update); } public FileSKVIterator openReader(FileRef ref) throws IOException { + Preconditions.checkState(volumeManager != null, + "Opening files is not supported at this time. Its only supported when CompactionStrategy.gatherInformation() is called."); // @TODO verify the file isn't some random file in HDFS // @TODO ensure these files are always closed? FileOperations fileFactory = FileOperations.getInstance();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java index 5ec175b..9bd5dd3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java @@ -18,8 +18,11 @@ package org.apache.accumulo.tserver.compaction.strategies; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -27,6 +30,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.regex.Pattern; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; import org.apache.accumulo.core.compaction.CompactionSettings; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.file.FileSKVIterator; @@ -48,6 +53,84 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy { abstract boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request); } + private static class SummaryTest extends Test { + + private boolean selectExtraSummary; + private boolean selectNoSummary; + + private boolean summaryConfigured = true; + private boolean gatherCalled = false; + + // files that do not need compaction + private Set<FileRef> okFiles = Collections.emptySet(); + + public SummaryTest(boolean selectExtraSummary, boolean selectNoSummary) { + this.selectExtraSummary = selectExtraSummary; + this.selectNoSummary = selectNoSummary; + } + + @Override + void gatherInformation(MajorCompactionRequest request) { + gatherCalled = true; + Collection<SummarizerConfiguration> configs = SummarizerConfiguration.fromTableProperties(request.getTableProperties()); + if (configs.size() == 0) { + summaryConfigured = false; + } else { + Set<SummarizerConfiguration> configsSet = configs instanceof Set ? (Set<SummarizerConfiguration>) configs : new HashSet<>(configs); + okFiles = new HashSet<>(); + + for (FileRef fref : request.getFiles().keySet()) { + Map<SummarizerConfiguration,Summary> sMap = new HashMap<>(); + Collection<Summary> summaries; + try { + summaries = request.getSummaries(Collections.singletonList(fref), conf -> configsSet.contains(conf)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + for (Summary summary : summaries) { + sMap.put(summary.getSummarizerConfiguration(), summary); + } + + boolean needsCompaction = false; + for (SummarizerConfiguration sc : configs) { + Summary summary = sMap.get(sc); + + if (summary == null && selectNoSummary) { + needsCompaction = true; + break; + } + + if (summary != null && summary.getFileStatistics().getExtra() > 0 && selectExtraSummary) { + needsCompaction = true; + break; + } + } + + if (!needsCompaction) { + okFiles.add(fref); + } + } + } + + } + + @Override + public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) { + + if (!gatherCalled) { + Collection<SummarizerConfiguration> configs = SummarizerConfiguration.fromTableProperties(request.getTableProperties()); + return configs.size() > 0; + } + + if (!summaryConfigured) { + return false; + } + + // Its possible the set of files could change between gather and now. So this will default to compacting any files that are unknown. + return !okFiles.contains(file.getKey()); + } + } + private static class NoSampleTest extends Test { private Set<FileRef> filesWithSample = Collections.emptySet(); @@ -69,7 +152,7 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy { filesWithSample.add(fref); } } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException(e); } } } @@ -130,10 +213,19 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy { @Override public void init(Map<String,String> options) { + boolean selectNoSummary = false; + boolean selectExtraSummary = false; + Set<Entry<String,String>> es = options.entrySet(); for (Entry<String,String> entry : es) { switch (CompactionSettings.valueOf(entry.getKey())) { + case SF_EXTRA_SUMMARY: + selectExtraSummary = true; + break; + case SF_NO_SUMMARY: + selectNoSummary = true; + break; case SF_NO_SAMPLE: tests.add(new NoSampleTest()); break; @@ -191,6 +283,11 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy { throw new IllegalArgumentException("Unknown option " + entry.getKey()); } } + + if (selectExtraSummary || selectNoSummary) { + tests.add(new SummaryTest(selectExtraSummary, selectNoSummary)); + } + } private List<FileRef> getFilesToCompact(MajorCompactionRequest request) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/TooManyDeletesCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/TooManyDeletesCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/TooManyDeletesCompactionStrategy.java new file mode 100644 index 0000000..6399623 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/TooManyDeletesCompactionStrategy.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.tserver.compaction.strategies; + +import static org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer.DELETES_STAT; +import static org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer.TOTAL_STAT; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Predicate; + +import org.apache.accumulo.core.client.mapred.AccumuloFileOutputFormat; +import org.apache.accumulo.core.client.rfile.RFile.WriterOptions; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.tserver.compaction.CompactionPlan; +import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy; +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This compaction strategy works in concert with the {@link DeletesSummarizer}. Using the statistics from DeleteSummarizer this strategy will compact all files + * in a table when the number of deletes/non-deletes exceeds a threshold. + * + * <p> + * This strategy has two options. First the {@value #THRESHOLD_OPT} option allows setting the point at which a compaction will be triggered. This options + * defaults to {@value #THRESHOLD_OPT_DEFAULT} and must be in the range (0.0, 1.0]. The second option is {@value #PROCEED_ZERO_NO_SUMMARY_OPT} which determines + * if the strategy should proceed when a bulk imported file has no summary information. + * + * <p> + * If the delete summarizer was configured on a table that already had files, then those files will have not summary information. This strategy can still + * proceed in this situation. It will fall back to using Accumulo's estimated entires per file in this case. For the files without summary information the + * estimated number of deletes will be zero. This fall back method will underestimate deletes which will not lead to false positives, except for the case of + * bulk imported files. Accumulo estimates that bulk imported files have zero entires. The second option {@value #PROCEED_ZERO_NO_SUMMARY_OPT} determines if + * this strategy should proceed when it sees bulk imported files that do not have summary data. This option defaults to + * {@value #PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT}. + * + * <p> + * Bulk files can be generated with summary information by calling + * {@link AccumuloFileOutputFormat#setSummarizers(org.apache.hadoop.mapred.JobConf, SummarizerConfiguration...)} or + * {@link WriterOptions#withSummarizers(SummarizerConfiguration...)} + * + * <p> + * When this strategy does not decide to compact based on the number of deletes, then it will defer the decision to the {@link DefaultCompactionStrategy}. + * + * <p> + * Configuring this compaction strategy for a table will cause it to always queue compactions, even though it may not decide to compact. These queued + * compactions may show up on the Accumulo monitor page. This is because summary data can not be read until after compaction is queued and dequeued. When the + * compaction is dequeued it can then decide not to compact. See <a href=https://issues.apache.org/jira/browse/ACCUMULO-4573>ACCUMULO-4573</a> + * + * @since 2.0.0 + */ +public class TooManyDeletesCompactionStrategy extends DefaultCompactionStrategy { + + private boolean shouldCompact = false; + + Logger log = LoggerFactory.getLogger(TooManyDeletesCompactionStrategy.class); + + private double threshold; + + private boolean proceed_bns; + + /** + * This option should be a floating point number between 1 and 0. + */ + public static final String THRESHOLD_OPT = "threshold"; + + /** + * The default threshold. + */ + public static final String THRESHOLD_OPT_DEFAULT = ".25"; + + public static final String PROCEED_ZERO_NO_SUMMARY_OPT = "proceed_zero_no_summary"; + + public static final String PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT = "false"; + + public void init(Map<String,String> options) { + this.threshold = Double.parseDouble(options.getOrDefault(THRESHOLD_OPT, THRESHOLD_OPT_DEFAULT)); + if (threshold <= 0.0 || threshold > 1.0) { + throw new IllegalArgumentException("Threshold must be in range (0.0, 1.0], saw : " + threshold); + } + + this.proceed_bns = Boolean.parseBoolean(options.getOrDefault(PROCEED_ZERO_NO_SUMMARY_OPT, PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT)); + } + + @Override + public boolean shouldCompact(MajorCompactionRequest request) { + Collection<SummarizerConfiguration> configuredSummarizers = SummarizerConfiguration.fromTableProperties(request.getTableProperties()); + + // check if delete summarizer is configured for table + if (configuredSummarizers.stream().map(sc -> sc.getClassName()).anyMatch(cn -> cn.equals(DeletesSummarizer.class.getName()))) { + // This is called before gatherInformation, so need to always queue for compaction until info can be gathered. Also its not safe to request summary + // information here as its a blocking operation. Blocking operations are not allowed in shouldCompact. + return true; + } else { + return super.shouldCompact(request); + } + } + + @Override + public void gatherInformation(MajorCompactionRequest request) throws IOException { + super.gatherInformation(request); + + Predicate<SummarizerConfiguration> summarizerPredicate = conf -> conf.getClassName().equals(DeletesSummarizer.class.getName()) + && conf.getOptions().isEmpty(); + + long total = 0; + long deletes = 0; + + for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { + Collection<Summary> summaries = request.getSummaries(Collections.singleton(entry.getKey()), summarizerPredicate); + if (summaries.size() == 1) { + Summary summary = summaries.iterator().next(); + total += summary.getStatistics().get(TOTAL_STAT); + deletes += summary.getStatistics().get(DELETES_STAT); + } else { + long numEntries = entry.getValue().getNumEntries(); + if (numEntries == 0 && !proceed_bns) { + shouldCompact = false; + return; + } else { + // no summary data so use Accumulo's estimate of total entries in file + total += entry.getValue().getNumEntries(); + } + } + } + + long nonDeletes = total - deletes; + + if (nonDeletes >= 0) { + // check nonDeletes >= 0 because if this is not true then its clear evidence that the estimates are off + + double ratio = deletes / (double) nonDeletes; + shouldCompact = ratio >= threshold; + } else { + shouldCompact = false; + } + } + + @Override + public CompactionPlan getCompactionPlan(MajorCompactionRequest request) { + if (shouldCompact) { + CompactionPlan cp = new CompactionPlan(); + cp.inputFiles.addAll(request.getFiles().keySet()); + return cp; + } + + // fall back to default + return super.getCompactionPlan(request); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SummarySession.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SummarySession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SummarySession.java new file mode 100644 index 0000000..7d9145e --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SummarySession.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.tserver.session; + +import java.util.concurrent.Future; + +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.summary.SummaryCollection; + +public class SummarySession extends Session { + + private Future<SummaryCollection> future; + + public SummarySession(TCredentials credentials, Future<SummaryCollection> future) { + super(credentials); + this.future = future; + } + + public Future<SummaryCollection> getFuture() { + return future; + } + + @Override + public boolean cleanup() { + return future.cancel(true); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index f0c0695..e4d32a6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -73,6 +73,7 @@ import org.apache.accumulo.core.data.thrift.IterInfo; import org.apache.accumulo.core.data.thrift.MapFileInfo; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -1667,7 +1668,9 @@ public class Tablet implements TabletCommitter { } if (strategy != null) { - MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, getTabletServer().getFileSystem(), tableConfiguration); + BlockCache sc = tabletResources.getTabletServerResourceManager().getSummaryCache(); + BlockCache ic = tabletResources.getTabletServerResourceManager().getIndexCache(); + MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, getTabletServer().getFileSystem(), tableConfiguration, sc, ic); request.setFiles(getDatafileManager().getDatafileSizes()); strategy.gatherInformation(request); } @@ -1711,7 +1714,7 @@ public class Tablet implements TabletCommitter { // enforce rules: files with keys outside our range need to be compacted inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, allFiles.keySet())); } else { - MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, tableConfiguration); + MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, tableConfiguration); request.setFiles(allFiles); plan = strategy.getCompactionPlan(request); if (plan != null) { @@ -2463,7 +2466,7 @@ public class Tablet implements TabletCommitter { CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy(); CompactionStrategy strategy = createCompactionStrategy(strategyConfig); - MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration); + MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, tableConfiguration); request.setFiles(getDatafileManager().getDatafileSizes()); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java index e54e1c8..0cb2ab2 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java @@ -151,7 +151,7 @@ public class DefaultCompactionStrategyTest { } TestCompactionRequest(KeyExtent extent, MajorCompactionReason reason, Map<FileRef,DataFileValue> files) { - super(extent, reason, null, dfault); + super(extent, reason, dfault); setFiles(files); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java index 648f451..91e9860 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java @@ -51,7 +51,7 @@ public class SizeLimitCompactionStrategyTest { slcs.init(opts); KeyExtent ke = new KeyExtent("0", null, null); - MajorCompactionRequest mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, null, AccumuloConfiguration.getDefaultConfiguration()); + MajorCompactionRequest mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, AccumuloConfiguration.getDefaultConfiguration()); mcr.setFiles(nfl("f1", "2G", "f2", "2G", "f3", "2G", "f4", "2G")); http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategyTest.java index 6fb37da..be73cb2 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategyTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategyTest.java @@ -62,7 +62,7 @@ public class TwoTierCompactionStrategyTest { ttcs.init(opts); conf = AccumuloConfiguration.getDefaultConfiguration(); KeyExtent ke = new KeyExtent("0", null, null); - mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, null, conf); + mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, conf); Map<FileRef,DataFileValue> fileMap = createFileMap("f1", "10M", "f2", "10M", "f3", "10M", "f4", "10M", "f5", "100M", "f6", "100M", "f7", "100M", "f8", "100M"); mcr.setFiles(fileMap); @@ -81,7 +81,7 @@ public class TwoTierCompactionStrategyTest { ttcs.init(opts); conf = AccumuloConfiguration.getDefaultConfiguration(); KeyExtent ke = new KeyExtent("0", null, null); - mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, null, conf); + mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, conf); Map<FileRef,DataFileValue> fileMap = createFileMap("f1", "2G", "f2", "2G", "f3", "2G", "f4", "2G"); mcr.setFiles(fileMap); @@ -110,7 +110,7 @@ public class TwoTierCompactionStrategyTest { ttcs.init(opts); conf = AccumuloConfiguration.getDefaultConfiguration(); KeyExtent ke = new KeyExtent("0", null, null); - mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, null, conf); + mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, conf); Map<FileRef,DataFileValue> fileMap = createFileMap("f1", "1G", "f2", "10M", "f3", "10M", "f4", "10M", "f5", "10M", "f6", "10M", "f7", "10M"); Map<FileRef,DataFileValue> filesToCompactMap = createFileMap("f2", "10M", "f3", "10M", "f4", "10M", "f5", "10M", "f6", "10M", "f7", "10M"); mcr.setFiles(fileMap); http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java index d2a1fe4..ce43665 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java @@ -37,7 +37,7 @@ public class ConfigurableCompactionStrategyTest { @Test public void testOutputOptions() throws Exception { - MajorCompactionRequest mcr = new MajorCompactionRequest(new KeyExtent("1", null, null), MajorCompactionReason.USER, null, null); + MajorCompactionRequest mcr = new MajorCompactionRequest(new KeyExtent("1", null, null), MajorCompactionReason.USER, null); Map<FileRef,DataFileValue> files = new HashMap<>(); files.put(new FileRef("hdfs://nn1/accumulo/tables/1/t-009/F00001.rf"), new DataFileValue(50000, 400)); http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/shell/src/main/java/org/apache/accumulo/shell/Shell.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java index 73f4a42..4649a7e 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java +++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java @@ -152,6 +152,7 @@ import org.apache.accumulo.shell.commands.SetIterCommand; import org.apache.accumulo.shell.commands.SetScanIterCommand; import org.apache.accumulo.shell.commands.SetShellIterCommand; import org.apache.accumulo.shell.commands.SleepCommand; +import org.apache.accumulo.shell.commands.SummariesCommand; import org.apache.accumulo.shell.commands.SystemPermissionsCommand; import org.apache.accumulo.shell.commands.TableCommand; import org.apache.accumulo.shell.commands.TablePermissionsCommand; @@ -415,7 +416,8 @@ public class Shell extends ShellOptions implements KeywordExecutable { new TableCommand(), new UserCommand(), new WhoAmICommand()}; Command[] tableCommands = {new CloneTableCommand(), new ConfigCommand(), new CreateTableCommand(), new DeleteTableCommand(), new DropTableCommand(), new DUCommand(), new ExportTableCommand(), new ImportTableCommand(), new OfflineCommand(), new OnlineCommand(), new RenameTableCommand(), - new TablesCommand(), new NamespacesCommand(), new CreateNamespaceCommand(), new DeleteNamespaceCommand(), new RenameNamespaceCommand()}; + new TablesCommand(), new NamespacesCommand(), new CreateNamespaceCommand(), new DeleteNamespaceCommand(), new RenameNamespaceCommand(), + new SummariesCommand()}; Command[] tableControlCommands = {new AddSplitsCommand(), new CompactCommand(), new ConstraintCommand(), new FlushCommand(), new GetGroupsCommand(), new GetSplitsCommand(), new MergeCommand(), new SetGroupsCommand()}; Command[] userCommands = {new AddAuthsCommand(), new CreateUserCommand(), new DeleteUserCommand(), new DropUserCommand(), new GetAuthsCommand(), http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java index c8b0e11..31003b0 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java @@ -38,7 +38,7 @@ public class CompactCommand extends TableOperation { // file selection and file output options private Option enameOption, epathOption, sizeLtOption, sizeGtOption, minFilesOption, outBlockSizeOpt, outHdfsBlockSizeOpt, outIndexBlockSizeOpt, - outCompressionOpt, outReplication, enoSampleOption; + outCompressionOpt, outReplication, enoSampleOption, extraSummaryOption, enoSummaryOption; private CompactionConfig compactionConfig = null; @@ -89,6 +89,8 @@ public class CompactCommand extends TableOperation { private Map<String,String> getConfigurableCompactionStrategyOpts(CommandLine cl) { Map<String,String> opts = new HashMap<>(); + put(cl, opts, extraSummaryOption, CompactionSettings.SF_EXTRA_SUMMARY); + put(cl, opts, enoSummaryOption, CompactionSettings.SF_NO_SUMMARY); put(cl, opts, enoSampleOption, CompactionSettings.SF_NO_SAMPLE); put(cl, opts, enameOption, CompactionSettings.SF_NAME_RE_OPT); put(cl, opts, epathOption, CompactionSettings.SF_PATH_RE_OPT); @@ -191,6 +193,10 @@ public class CompactCommand extends TableOperation { cancelOpt = new Option(null, "cancel", false, "cancel user initiated compactions"); opts.addOption(cancelOpt); + enoSummaryOption = new Option(null, "sf-no-summary", false, "Select files that do not have the summaries specified in the table configuration."); + opts.addOption(enoSummaryOption); + extraSummaryOption = new Option(null, "sf-extra-summary", false, "Select files that have summary information which exceeds the tablets boundries."); + opts.addOption(extraSummaryOption); enoSampleOption = new Option(null, "sf-no-sample", false, "Select files that have no sample data or sample data that differes from the table configuration."); opts.addOption(enoSampleOption); http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/shell/src/main/java/org/apache/accumulo/shell/commands/SummariesCommand.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/SummariesCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/SummariesCommand.java new file mode 100644 index 0000000..e0e6d2a --- /dev/null +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/SummariesCommand.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.shell.commands; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.SummaryRetriever; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.shell.Shell; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.hadoop.io.Text; + +public class SummariesCommand extends TableOperation { + + private Text startRow; + private Text endRow; + private boolean paginate; + private String selectionRegex = ".*"; + + private Option disablePaginationOpt; + private Option summarySelectionOpt; + + @Override + public String description() { + return "retrieves summary statistics"; + } + + @Override + protected void doTableOp(final Shell shellState, final String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + IOException { + Connector conn = shellState.getConnector(); + SummaryRetriever retriever = conn.tableOperations().summaries(tableName).withMatchingConfiguration(selectionRegex); + if (startRow != null) { + retriever.startRow(startRow); + } + + if (endRow != null) { + retriever.endRow(endRow); + } + + Collection<Summary> summaries = retriever.retrieve(); + + ArrayList<String> lines = new ArrayList<>(); + + boolean addEmpty = false; + for (Summary summary : summaries) { + if (addEmpty) + lines.add(""); + addEmpty = true; + lines.add(String.format(" Summarizer : %s", summary.getSummarizerConfiguration())); + lines.add(String.format(" File Statistics : %s", summary.getFileStatistics())); + lines.add(String.format(" Summary Statistics : ")); + + Map<String,Long> stats = summary.getStatistics(); + ArrayList<String> keys = new ArrayList<>(stats.keySet()); + Collections.sort(keys); + for (String key : keys) { + lines.add(String.format(" %-60s = %,d", key, stats.get(key))); + } + } + + shellState.printLines(lines.iterator(), paginate); + + } + + @Override + public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { + startRow = OptUtil.getStartRow(cl); + endRow = OptUtil.getEndRow(cl); + paginate = !cl.hasOption(disablePaginationOpt.getOpt()); + if (cl.hasOption(summarySelectionOpt.getOpt())) { + selectionRegex = cl.getOptionValue(summarySelectionOpt.getOpt()); + } else { + selectionRegex = ".*"; + } + return super.execute(fullCommand, cl, shellState); + } + + @Override + public Options getOptions() { + final Options opts = super.getOptions(); + disablePaginationOpt = new Option("np", "no-pagination", false, "disable pagination of output"); + summarySelectionOpt = new Option("sr", "select-regex", true, + "regex to select summaries. Matches against class name and options used to generate summaries."); + opts.addOption(disablePaginationOpt); + opts.addOption(summarySelectionOpt); + opts.addOption(OptUtil.startRowOpt()); + opts.addOption(OptUtil.endRowOpt()); + return opts; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java index eae5ca9..d6d83cc 100644 --- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java @@ -54,6 +54,8 @@ import org.apache.accumulo.core.client.sample.RowSampler; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer; +import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -1487,7 +1489,7 @@ public class ShellServerIT extends SharedMiniClusterBase { ts.exec("systempermissions"); assertEquals(12, ts.output.get().split("\n").length - 1); ts.exec("tablepermissions", true); - assertEquals(6, ts.output.get().split("\n").length - 1); + assertEquals(7, ts.output.get().split("\n").length - 1); } @Test @@ -1894,4 +1896,129 @@ public class ShellServerIT extends SharedMiniClusterBase { return null; } + private static void assertMatches(String output, String pattern) { + Assert.assertTrue("Pattern " + pattern + " did not match output : " + output, output.matches(pattern)); + } + + private static void assertNotContains(String output, String subsequence) { + Assert.assertFalse("Expected '" + subsequence + "' would not occur in output : " + output, output.contains(subsequence)); + } + + @Test + public void testSummaries() throws Exception { + ts.exec("createtable summary"); + ts.exec("config -t summary -s table.summarizer.del=" + DeletesSummarizer.class.getName()); + ts.exec("config -t summary -s table.summarizer.fam=" + FamilySummarizer.class.getName()); + + ts.exec("addsplits -t summary r1 r2"); + ts.exec("insert r1 f1 q1 v1"); + ts.exec("insert r2 f2 q1 v3"); + ts.exec("insert r2 f2 q2 v4"); + ts.exec("insert r3 f3 q1 v5"); + ts.exec("insert r3 f3 q2 v6"); + ts.exec("insert r3 f3 q3 v7"); + ts.exec("flush -t summary -w"); + + String output = ts.exec("summaries"); + assertMatches(output, "(?sm).*^.*deletes\\s+=\\s+0.*$.*"); + assertMatches(output, "(?sm).*^.*total\\s+=\\s+6.*$.*"); + assertMatches(output, "(?sm).*^.*c:f1\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertMatches(output, "(?sm).*^.*c:f3\\s+=\\s+3.*$.*"); + + ts.exec("delete r1 f1 q2"); + ts.exec("delete r2 f2 q1"); + ts.exec("flush -t summary -w"); + + output = ts.exec("summaries"); + assertMatches(output, "(?sm).*^.*deletes\\s+=\\s+2.*$.*"); + assertMatches(output, "(?sm).*^.*total\\s+=\\s+8.*$.*"); + assertMatches(output, "(?sm).*^.*c:f1\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertMatches(output, "(?sm).*^.*c:f3\\s+=\\s+3.*$.*"); + + output = ts.exec("summaries -e r2"); + assertMatches(output, "(?sm).*^.*deletes\\s+=\\s+2.*$.*"); + assertMatches(output, "(?sm).*^.*total\\s+=\\s+5.*$.*"); + assertMatches(output, "(?sm).*^.*c:f1\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertNotContains(output, "c:f3"); + + output = ts.exec("summaries -b r2"); + assertMatches(output, "(?sm).*^.*deletes\\s+=\\s+0.*$.*"); + assertMatches(output, "(?sm).*^.*total\\s+=\\s+3.*$.*"); + assertNotContains(output, "c:f1"); + assertNotContains(output, "c:f2"); + assertMatches(output, "(?sm).*^.*c:f3\\s+=\\s+3.*$.*"); + + output = ts.exec("summaries -b r1 -e r2"); + assertMatches(output, "(?sm).*^.*deletes\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*total\\s+=\\s+3.*$.*"); + assertNotContains(output, "c:f1"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertNotContains(output, "c:f3"); + + output = ts.exec("summaries -sr .*Family.*"); + assertNotContains(output, "deletes "); + assertMatches(output, "(?sm).*^.*c:f1\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertMatches(output, "(?sm).*^.*c:f3\\s+=\\s+3.*$.*"); + + output = ts.exec("summaries -b r1 -e r2 -sr .*Family.*"); + assertNotContains(output, "deletes "); + assertNotContains(output, "c:f1"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertNotContains(output, "c:f3"); + } + + @Test + public void testSummarySelection() throws Exception { + ts.exec("createtable summary2"); + // will create a few files and do not want them compacted + ts.exec("config -t summary2 -s " + Property.TABLE_MAJC_RATIO + "=10"); + + ts.exec("insert r1 f1 q1 v1"); + ts.exec("insert r2 f2 q1 v2"); + ts.exec("flush -t summary2 -w"); + + ts.exec("config -t summary2 -s table.summarizer.fam=" + FamilySummarizer.class.getName()); + + ts.exec("insert r1 f2 q1 v3"); + ts.exec("insert r3 f3 q1 v4"); + ts.exec("flush -t summary2 -w"); + + String output = ts.exec("summaries"); + assertNotContains(output, "c:f1"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*c:f3\\s+=\\s+1.*$.*"); + // check that there are two files, with one missing summary info + assertMatches(output, "(?sm).*^.*total[:]2[,]\\s+missing[:]1[,]\\s+extra[:]0.*$.*"); + + // compact only the file missing summary info + ts.exec("compact -t summary2 --sf-no-summary -w"); + output = ts.exec("summaries"); + assertMatches(output, "(?sm).*^.*c:f1\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertMatches(output, "(?sm).*^.*c:f3\\s+=\\s+1.*$.*"); + // check that there are two files, with none missing summary info + assertMatches(output, "(?sm).*^.*total[:]2[,]\\s+missing[:]0[,]\\s+extra[:]0.*$.*"); + + // create a situation where files has summary data outside of tablet + ts.exec("addsplits -t summary2 r2"); + output = ts.exec("summaries -e r2"); + assertMatches(output, "(?sm).*^.*c:f1\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertMatches(output, "(?sm).*^.*c:f3\\s+=\\s+1.*$.*"); + // check that there are two files, with one having extra summary info + assertMatches(output, "(?sm).*^.*total[:]2[,]\\s+missing[:]0[,]\\s+extra[:]1.*$.*"); + + // compact only the files with extra summary info + ts.exec("compact -t summary2 --sf-extra-summary -w"); + output = ts.exec("summaries -e r2"); + assertMatches(output, "(?sm).*^.*c:f1\\s+=\\s+1.*$.*"); + assertMatches(output, "(?sm).*^.*c:f2\\s+=\\s+2.*$.*"); + assertNotContains(output, "c:f3"); + // check that there are two files, with none having extra summary info + assertMatches(output, "(?sm).*^.*total[:]2[,]\\s+missing[:]0[,]\\s+extra[:]0.*$.*"); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/test/src/main/java/org/apache/accumulo/test/functional/BasicSummarizer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BasicSummarizer.java b/test/src/main/java/org/apache/accumulo/test/functional/BasicSummarizer.java new file mode 100644 index 0000000..b109c7e --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/BasicSummarizer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test.functional; + +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; + +/** + * This summarizer collects some very basic statistics about Keys. + */ +public class BasicSummarizer implements Summarizer { + + public static final String DELETES_STAT = "deletes"; + public static final String MIN_TIMESTAMP_STAT = "minTimestamp"; + public static final String MAX_TIMESTAMP_STAT = "maxTimestamp"; + public static final String TOTAL_STAT = "total"; + + @Override + public Collector collector(SummarizerConfiguration sc) { + return new Collector() { + + private long minStamp = Long.MAX_VALUE; + private long maxStamp = Long.MIN_VALUE; + private long deletes = 0; + private long total = 0; + + @Override + public void accept(Key k, Value v) { + if (k.getTimestamp() < minStamp) { + minStamp = k.getTimestamp(); + } + + if (k.getTimestamp() > maxStamp) { + maxStamp = k.getTimestamp(); + } + + if (k.isDeleted()) { + deletes++; + } + + total++; + } + + @Override + public void summarize(StatisticConsumer sc) { + sc.accept(MIN_TIMESTAMP_STAT, minStamp); + sc.accept(MAX_TIMESTAMP_STAT, maxStamp); + sc.accept(DELETES_STAT, deletes); + sc.accept(TOTAL_STAT, total); + } + }; + } + + @Override + public Combiner combiner(SummarizerConfiguration sc) { + return (stats1, stats2) -> { + stats1.merge(DELETES_STAT, stats2.get(DELETES_STAT), Long::sum); + stats1.merge(TOTAL_STAT, stats2.get(TOTAL_STAT), Long::sum); + stats1.merge(MIN_TIMESTAMP_STAT, stats2.get(MIN_TIMESTAMP_STAT), Long::min); + stats1.merge(MAX_TIMESTAMP_STAT, stats2.get(MAX_TIMESTAMP_STAT), Long::max); + }; + } +}