ACCUMULO-4501 ACCUMULO-96 Added Summarization closes apache/accumulo#224 closes apache/accumulo#168
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/94cdcc4d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/94cdcc4d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/94cdcc4d Branch: refs/heads/master Commit: 94cdcc4d3f0a8ccf95894f206cb71e6117f4e51d Parents: 68ba2ef Author: Keith Turner <ktur...@apache.org> Authored: Mon Mar 20 10:47:00 2017 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Mar 20 10:47:00 2017 -0400 ---------------------------------------------------------------------- .../client/admin/NewTableConfiguration.java | 49 +- .../core/client/admin/SummaryRetriever.java | 112 + .../core/client/admin/TableOperations.java | 65 + .../accumulo/core/client/impl/ServerClient.java | 36 +- .../core/client/impl/TableOperationsImpl.java | 151 +- .../client/mapred/AccumuloFileOutputFormat.java | 16 + .../mapreduce/AccumuloFileOutputFormat.java | 16 + .../lib/impl/FileOutputConfigurator.java | 9 + .../core/client/mock/MockTableOperations.java | 27 + .../accumulo/core/client/rfile/RFile.java | 129 + .../core/client/rfile/RFileScannerBuilder.java | 2 + .../client/rfile/RFileSummariesRetriever.java | 122 + .../accumulo/core/client/rfile/RFileWriter.java | 26 + .../core/client/rfile/RFileWriterBuilder.java | 38 +- .../core/client/summary/CounterSummary.java | 123 + .../core/client/summary/CountingSummarizer.java | 302 + .../core/client/summary/Summarizer.java | 227 + .../client/summary/SummarizerConfiguration.java | 285 + .../accumulo/core/client/summary/Summary.java | 145 + .../summary/summarizers/DeletesSummarizer.java | 75 + .../summary/summarizers/FamilySummarizer.java | 46 + .../summarizers/VisibilitySummarizer.java | 47 + .../core/compaction/CompactionSettings.java | 2 + .../org/apache/accumulo/core/conf/Property.java | 22 +- .../apache/accumulo/core/conf/PropertyType.java | 9 +- .../accumulo/core/data/ArrayByteSequence.java | 18 + .../accumulo/core/data/thrift/TRowRange.java | 521 ++ .../accumulo/core/data/thrift/TSummaries.java | 831 +++ .../data/thrift/TSummarizerConfiguration.java | 649 ++ .../accumulo/core/data/thrift/TSummary.java | 842 +++ .../core/data/thrift/TSummaryRequest.java | 760 +++ .../accumulo/core/file/BloomFilterLayer.java | 10 +- .../core/file/DispatchingFileFactory.java | 7 +- .../accumulo/core/file/FileOperations.java | 18 + .../accumulo/core/file/rfile/PrintInfo.java | 7 + .../core/file/rfile/RFileOperations.java | 2 +- .../core/metadata/schema/MetadataScanner.java | 236 + .../core/metadata/schema/TabletMetadata.java | 182 + .../sample/impl/SamplerConfigurationImpl.java | 12 - .../core/sample/impl/SamplerFactory.java | 8 +- .../accumulo/core/security/TablePermission.java | 5 +- .../apache/accumulo/core/summary/Gatherer.java | 631 ++ .../summary/SummarizerConfigurationUtil.java | 128 + .../core/summary/SummarizerFactory.java | 63 + .../core/summary/SummaryCollection.java | 188 + .../accumulo/core/summary/SummaryInfo.java | 53 + .../accumulo/core/summary/SummaryReader.java | 257 + .../core/summary/SummarySerializer.java | 542 ++ .../accumulo/core/summary/SummaryWriter.java | 157 + .../thrift/TabletClientService.java | 5642 +++++++++++++++++- .../accumulo/core/util/CancelFlagFuture.java | 67 + .../core/util/CompletableFutureUtil.java | 49 + core/src/main/thrift/data.thrift | 34 + core/src/main/thrift/tabletserver.thrift | 5 + .../client/impl/TableOperationsHelperTest.java | 26 + .../mapred/AccumuloFileOutputFormatTest.java | 18 + .../mapreduce/AccumuloFileOutputFormatTest.java | 18 + .../accumulo/core/client/rfile/RFileTest.java | 158 +- .../client/summary/CountingSummarizerTest.java | 259 + .../core/summary/SummaryCollectionTest.java | 72 + .../core/util/CompletableFutureUtilTest.java | 53 + .../main/asciidoc/accumulo_user_manual.asciidoc | 2 + docs/src/main/asciidoc/chapters/summaries.txt | 232 + .../standalone/StandaloneAccumuloCluster.java | 3 +- .../standalone/StandaloneClusterControl.java | 3 +- .../impl/MiniAccumuloConfigImpl.java | 3 +- .../server/security/SecurityOperation.java | 5 + .../apache/accumulo/tserver/TabletServer.java | 115 + .../tserver/TabletServerResourceManager.java | 49 +- .../tserver/compaction/CompactionStrategy.java | 1 - .../compaction/MajorCompactionRequest.java | 84 +- .../ConfigurableCompactionStrategy.java | 99 +- .../TooManyDeletesCompactionStrategy.java | 173 + .../tserver/session/SummarySession.java | 42 + .../apache/accumulo/tserver/tablet/Tablet.java | 9 +- .../DefaultCompactionStrategyTest.java | 2 +- .../SizeLimitCompactionStrategyTest.java | 2 +- .../TwoTierCompactionStrategyTest.java | 6 +- .../ConfigurableCompactionStrategyTest.java | 2 +- .../java/org/apache/accumulo/shell/Shell.java | 4 +- .../accumulo/shell/commands/CompactCommand.java | 8 +- .../shell/commands/SummariesCommand.java | 115 + .../org/apache/accumulo/test/ShellServerIT.java | 129 +- .../test/functional/BasicSummarizer.java | 80 + .../accumulo/test/functional/SummaryIT.java | 820 +++ .../test/functional/TooManyDeletesIT.java | 121 + .../test/performance/thrift/NullTserver.java | 33 +- 87 files changed, 16482 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java index 4694e1e..9d5d31a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java @@ -19,14 +19,18 @@ package org.apache.accumulo.core.client.admin; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.summary.SummarizerConfigurationUtil; /** * This object stores table creation parameters. Currently includes: {@link TimeType}, whether to include default iterators, and user-specified initial @@ -41,8 +45,13 @@ public class NewTableConfiguration { private boolean limitVersion = true; - private Map<String,String> properties = new HashMap<>(); - private SamplerConfiguration samplerConfiguration; + private Map<String,String> properties = Collections.emptyMap(); + private Map<String,String> samplerProps = Collections.emptyMap(); + private Map<String,String> summarizerProps = Collections.emptyMap(); + + private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps, String kind) { + checkArgument(Collections.disjoint(props.keySet(), derivedProps.keySet()), "Properties and derived %s properties are not disjoint", kind); + } /** * Configure logical or millisecond time for tables created with this configuration. @@ -82,15 +91,15 @@ public class NewTableConfiguration { * Sets additional properties to be applied to tables created with this configuration. Additional calls to this method replaces properties set by previous * calls. * - * @param prop + * @param props * additional properties to add to the table when it is created * @return this */ - public NewTableConfiguration setProperties(Map<String,String> prop) { - checkArgument(prop != null, "properties is null"); - SamplerConfigurationImpl.checkDisjoint(prop, samplerConfiguration); - - this.properties = new HashMap<>(prop); + public NewTableConfiguration setProperties(Map<String,String> props) { + checkArgument(props != null, "properties is null"); + checkDisjoint(props, samplerProps, "sampler"); + checkDisjoint(props, summarizerProps, "summarizer"); + this.properties = new HashMap<>(props); return this; } @@ -106,10 +115,8 @@ public class NewTableConfiguration { propertyMap.putAll(IteratorUtil.generateInitialTableProperties(limitVersion)); } - if (samplerConfiguration != null) { - propertyMap.putAll(new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap()); - } - + propertyMap.putAll(summarizerProps); + propertyMap.putAll(samplerProps); propertyMap.putAll(properties); return Collections.unmodifiableMap(propertyMap); } @@ -121,8 +128,22 @@ public class NewTableConfiguration { */ public NewTableConfiguration enableSampling(SamplerConfiguration samplerConfiguration) { requireNonNull(samplerConfiguration); - SamplerConfigurationImpl.checkDisjoint(properties, samplerConfiguration); - this.samplerConfiguration = samplerConfiguration; + Map<String,String> tmp = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap(); + checkDisjoint(properties, tmp, "sampler"); + this.samplerProps = tmp; + return this; + } + + /** + * Enables creating summary statistics using {@link Summarizer}'s for the new table. + * + * @since 2.0.0 + */ + public NewTableConfiguration enableSummarization(SummarizerConfiguration... configs) { + requireNonNull(configs); + Map<String,String> tmp = SummarizerConfigurationUtil.toTablePropertiesMap(Arrays.asList(configs)); + checkDisjoint(properties, tmp, "summarizer"); + summarizerProps = tmp; return this; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java new file mode 100644 index 0000000..8dcf048 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.admin; + +import java.util.Collection; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.hadoop.io.Text; + +/** + * This interface allows configuring where and which summary data to retrieve before retrieving it. + * + * @since 2.0.0 + */ +public interface SummaryRetriever { + + /** + * Forces a flush of data in memory to files before summary data is retrieved. Data recently written to Accumulo may be in memory. Summary data is only + * retrieved from files. Therefore recently written data may not be represented in summaries, unless this options is set to true. This is optional and + * defaults to false. + * + * @return this + */ + SummaryRetriever flush(boolean shouldFlush); + + /** + * The start row is not inclusive. Calling this method is optional. + */ + SummaryRetriever startRow(Text startRow); + + /** + * The start row is not inclusive. Calling this method is optional. + */ + SummaryRetriever startRow(CharSequence startRow); + + /** + * The end row is inclusive. Calling this method is optional. + */ + SummaryRetriever endRow(Text endRow); + + /** + * The end row is inclusive. Calling this method is optional. + */ + SummaryRetriever endRow(CharSequence endRow); + + /** + * Filters which summary data is retrieved. By default all summary data present is retrieved. If only a subset of summary data is needed, then its best to be + * selective in order to avoid polluting summary data cache. + * + * <p> + * Each set of summary data is generated using a specific {@link SummarizerConfiguration}. The methods {@link #withConfiguration(Collection)} and + * {@link #withConfiguration(SummarizerConfiguration...)} allow selecting sets of summary data based on exact {@link SummarizerConfiguration} matches. This + * method enables less exact matching using regular expressions. + * + * <p> + * The regular expression passed to this method is used in the following way on the server side to match {@link SummarizerConfiguration} object. When a + * {@link SummarizerConfiguration} matches, the summary data generated using that configuration is returned. + * + * <pre> + * <code> + * boolean doesConfigurationMatch(SummarizerConfiguration conf, String regex) { + * // This is how conf is converted to a String in tablet servers for matching. + * // The options are sorted to make writing regular expressions easier. + * String confString = conf.getClassName()+" "+new TreeMap<>(conf.getOptions()); + * return Pattern.compile(regex).matcher(confString).matches(); + * } + * </code> + * </pre> + */ + SummaryRetriever withMatchingConfiguration(String regex); + + /** + * Allows specifying a set of summaries, generated using the specified configs, to retrieve. By default will retrieve all present. + * + * <p> + * Using this method to be more selective may pull less data in to the tablet servers summary cache. + */ + SummaryRetriever withConfiguration(SummarizerConfiguration... config); + + /** + * Allows specifying a set of summaries, generated using the specified configs, to retrieve. By default will retrieve all present. + * + * <p> + * Using this method to be more selective may pull less data in to the tablet servers summary cache. + */ + SummaryRetriever withConfiguration(Collection<SummarizerConfiguration> configs); + + /** + * @return a map of counter groups to counts + */ + List<Summary> retrieve() throws AccumuloException, AccumuloSecurityException, TableNotFoundException; +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java index cabcfa3..f88d28e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; +import java.util.function.Predicate; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -34,9 +35,12 @@ import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; import org.apache.hadoop.io.Text; /** @@ -808,4 +812,65 @@ public interface TableOperations { * @since 1.8.0 */ SamplerConfiguration getSamplerConfiguration(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException; + + /** + * Entry point for retrieving summaries with optional restrictions. + * + * <p> + * In order to retrieve Summaries, the Accumulo user making the request will need the {@link TablePermission#GET_SUMMARIES} table permission. + * + * <p> + * Accumulo stores summary data with each file in each tablet. In order to make retrieving it faster there is a per tablet server cache of summary data. When + * summary data for a file is not present, it will be retrieved using threads on the tserver. The tablet server properties + * {@code tserver.summary.partition.threads}, {@code tserver.summary.remote.threads}, {@code tserver.summary.retrieval.threads}, and + * {@code tserver.cache.summary.size} impact the performance of retrieving summaries. + * + * <p> + * Since summary data is cached, its important to use the summary selection options to only read the needed data into the cache. + * + * <p> + * Summary data will be merged on the tablet servers and then in this client process. Therefore it's important that the required summarizers are on the + * clients classpath. + * + * @since 2.0.0 + * @see Summarizer + */ + SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException; + + /** + * Enables summary generation for this table for future compactions. + * + * @param tableName + * add summarizers to this table + * @param summarizers + * summarizers to add + * @throws IllegalArgumentException + * When new summarizers have the same property id as each other, or when the same summarizers previously added. + * @since 2.0.0 + * @see SummarizerConfiguration#toTableProperties() + * @see SummarizerConfiguration#toTableProperties(SummarizerConfiguration...) + * @see SummarizerConfiguration#toTableProperties(Collection) + */ + void addSummarizers(String tableName, SummarizerConfiguration... summarizers) throws TableNotFoundException, AccumuloException, AccumuloSecurityException; + + /** + * Removes summary generation for this table for the matching summarizers. + * + * @param tableName + * remove summarizers from this table + * @param predicate + * removes all summarizers whose configuration that matches this predicate + * @since 2.0.0 + */ + void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException, + AccumuloSecurityException; + + /** + * @param tableName + * list summarizers for this table + * @return the summarizers currently configured for the table + * @since 2.0.0 + * @see SummarizerConfiguration#fromTableProperties(Map) + */ + List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java index a4853f0..9d18f99 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java @@ -36,6 +36,9 @@ import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.TServiceClientFactory; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; @@ -47,8 +50,13 @@ public class ServerClient { private static final Logger log = LoggerFactory.getLogger(ServerClient.class); public static <T> T execute(ClientContext context, ClientExecReturn<T,ClientService.Client> exec) throws AccumuloException, AccumuloSecurityException { + return execute(context, new ClientService.Client.Factory(), exec); + } + + public static <CT extends TServiceClient,RT> RT execute(ClientContext context, TServiceClientFactory<CT> factory, ClientExecReturn<RT,CT> exec) + throws AccumuloException, AccumuloSecurityException { try { - return executeRaw(context, exec); + return executeRaw(context, factory, exec); } catch (ThriftSecurityException e) { throw new AccumuloSecurityException(e.user, e.code, e); } catch (AccumuloException e) { @@ -71,14 +79,21 @@ public class ServerClient { } public static <T> T executeRaw(ClientContext context, ClientExecReturn<T,ClientService.Client> exec) throws Exception { + return executeRaw(context, new ClientService.Client.Factory(), exec); + } + + public static <CT extends TServiceClient,RT> RT executeRaw(ClientContext context, TServiceClientFactory<CT> factory, ClientExecReturn<RT,CT> exec) + throws Exception { while (true) { - ClientService.Client client = null; + CT client = null; String server = null; try { - Pair<String,Client> pair = ServerClient.getConnection(context); + Pair<String,CT> pair = ServerClient.getConnection(context, factory); server = pair.getFirst(); client = pair.getSecond(); return exec.execute(client); + } catch (TApplicationException tae) { + throw new AccumuloServerException(server, tae); } catch (TTransportException tte) { log.debug("ClientService request failed " + server + ", retrying ... ", tte); sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -99,6 +114,8 @@ public class ServerClient { client = pair.getSecond(); exec.execute(client); break; + } catch (TApplicationException tae) { + throw new AccumuloServerException(server, tae); } catch (TTransportException tte) { log.debug("ClientService request failed " + server + ", retrying ... ", tte); sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -115,12 +132,21 @@ public class ServerClient { return getConnection(context, true); } + public static <CT extends TServiceClient> Pair<String,CT> getConnection(ClientContext context, TServiceClientFactory<CT> factory) throws TTransportException { + return getConnection(context, factory, true, context.getClientTimeoutInMillis()); + } + public static Pair<String,ClientService.Client> getConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException { return getConnection(context, preferCachedConnections, context.getClientTimeoutInMillis()); } public static Pair<String,ClientService.Client> getConnection(ClientContext context, boolean preferCachedConnections, long rpcTimeout) throws TTransportException { + return getConnection(context, new ClientService.Client.Factory(), preferCachedConnections, rpcTimeout); + } + + public static <CT extends TServiceClient> Pair<String,CT> getConnection(ClientContext context, TServiceClientFactory<CT> factory, + boolean preferCachedConnections, long rpcTimeout) throws TTransportException { checkArgument(context != null, "context is null"); // create list of servers ArrayList<ThriftTransportKey> servers = new ArrayList<>(); @@ -141,7 +167,7 @@ public class ServerClient { boolean opened = false; try { Pair<String,TTransport> pair = ThriftTransportPool.getInstance().getAnyTransport(servers, preferCachedConnections); - ClientService.Client client = ThriftUtil.createClient(new ClientService.Client.Factory(), pair.getSecond()); + CT client = ThriftUtil.createClient(factory, pair.getSecond()); opened = true; warnedAboutTServersBeingDown = false; return new Pair<>(pair.getFirst(), client); @@ -159,7 +185,7 @@ public class ServerClient { } } - public static void close(ClientService.Client client) { + public static void close(TServiceClient client) { if (client != null && client.getInputProtocol() != null && client.getInputProtocol().getTransport() != null) { ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport()); } else { http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java index 1c04a43..34b76fc 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toSet; import java.io.BufferedReader; import java.io.FileNotFoundException; @@ -38,6 +39,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.SortedSet; @@ -48,6 +50,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -69,16 +74,19 @@ import org.apache.accumulo.core.client.admin.DiskUsage; import org.apache.accumulo.core.client.admin.FindMax; import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.SummaryRetriever; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.ClientService.Client; -import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.impl.thrift.TDiskUsage; import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; @@ -90,6 +98,10 @@ import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.data.impl.TabletIdImpl; +import org.apache.accumulo.core.data.thrift.TRowRange; +import org.apache.accumulo.core.data.thrift.TSummaries; +import org.apache.accumulo.core.data.thrift.TSummarizerConfiguration; +import org.apache.accumulo.core.data.thrift.TSummaryRequest; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -103,6 +115,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.summary.SummarizerConfigurationUtil; +import org.apache.accumulo.core.summary.SummaryCollection; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.Tracer; @@ -126,6 +140,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; public class TableOperationsImpl extends TableOperationsHelper { @@ -1661,4 +1676,138 @@ public class TableOperationsImpl extends TableOperationsHelper { return new LoctionsImpl(binnedRanges); } + + @Override + public SummaryRetriever summaries(String tableName) { + + return new SummaryRetriever() { + + private Text startRow = null; + private Text endRow = null; + private List<TSummarizerConfiguration> summariesToFetch = Collections.emptyList(); + private String summarizerClassRegex; + private boolean flush = false; + + @Override + public SummaryRetriever startRow(Text startRow) { + Objects.requireNonNull(startRow); + if (endRow != null) { + Preconditions.checkArgument(startRow.compareTo(endRow) < 0, "Start row must be less than end row : %s >= %s", startRow, endRow); + } + this.startRow = startRow; + return this; + } + + @Override + public SummaryRetriever startRow(CharSequence startRow) { + return startRow(new Text(startRow.toString())); + } + + @Override + public List<Summary> retrieve() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + String tableId = Tables.getTableId(context.getInstance(), tableName); + if (Tables.getTableState(context.getInstance(), tableId) == TableState.OFFLINE) + throw new TableOfflineException(context.getInstance(), tableId); + + TRowRange range = new TRowRange(TextUtil.getByteBuffer(startRow), TextUtil.getByteBuffer(endRow)); + TSummaryRequest request = new TSummaryRequest(tableId, range, summariesToFetch, summarizerClassRegex); + if (flush) { + _flush(tableId, startRow, endRow, true); + } + + TSummaries ret = ServerClient.execute(context, new TabletClientService.Client.Factory(), client -> { + TSummaries tsr = client.startGetSummaries(Tracer.traceInfo(), context.rpcCreds(), request); + while (!tsr.finished) { + tsr = client.contiuneGetSummaries(Tracer.traceInfo(), tsr.sessionId); + } + return tsr; + }); + return new SummaryCollection(ret).getSummaries(); + } + + @Override + public SummaryRetriever endRow(Text endRow) { + Objects.requireNonNull(endRow); + if (startRow != null) { + Preconditions.checkArgument(startRow.compareTo(endRow) < 0, "Start row must be less than end row : %s >= %s", startRow, endRow); + } + this.endRow = endRow; + return this; + } + + @Override + public SummaryRetriever endRow(CharSequence endRow) { + return endRow(new Text(endRow.toString())); + } + + @Override + public SummaryRetriever withConfiguration(Collection<SummarizerConfiguration> configs) { + Objects.requireNonNull(configs); + summariesToFetch = configs.stream().map(SummarizerConfigurationUtil::toThrift).collect(Collectors.toList()); + return this; + } + + @Override + public SummaryRetriever withConfiguration(SummarizerConfiguration... config) { + Objects.requireNonNull(config); + return withConfiguration(Arrays.asList(config)); + } + + @Override + public SummaryRetriever withMatchingConfiguration(String regex) { + Objects.requireNonNull(regex); + // Do a sanity check here to make sure that regex compiles, instead of having it fail on a tserver. + Pattern.compile(regex); + this.summarizerClassRegex = regex; + return this; + } + + @Override + public SummaryRetriever flush(boolean b) { + this.flush = b; + return this; + } + }; + } + + @Override + public void addSummarizers(String tableName, SummarizerConfiguration... newConfigs) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + HashSet<SummarizerConfiguration> currentConfigs = new HashSet<>(SummarizerConfiguration.fromTableProperties(getProperties(tableName))); + HashSet<SummarizerConfiguration> newConfigSet = new HashSet<>(Arrays.asList(newConfigs)); + + newConfigSet.removeIf(sc -> currentConfigs.contains(sc)); + + Set<String> newIds = newConfigSet.stream().map(sc -> sc.getPropertyId()).collect(toSet()); + + for (SummarizerConfiguration csc : currentConfigs) { + if (newIds.contains(csc.getPropertyId())) { + throw new IllegalArgumentException("Summarizer property id is in use by " + csc); + } + } + + Set<Entry<String,String>> es = SummarizerConfiguration.toTableProperties(newConfigSet).entrySet(); + for (Entry<String,String> entry : es) { + setProperty(tableName, entry.getKey(), entry.getValue()); + } + } + + @Override + public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException, + AccumuloSecurityException { + Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration.fromTableProperties(getProperties(tableName)); + for (SummarizerConfiguration sc : summarizerConfigs) { + if (predicate.test(sc)) { + Set<String> ks = sc.toTableProperties().keySet(); + for (String key : ks) { + removeProperty(tableName, key); + } + } + } + } + + @Override + public List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException { + return new ArrayList<>(SummarizerConfiguration.fromTableProperties(getProperties(tableName))); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java index 640a85d..d7d2b2d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java @@ -23,6 +23,8 @@ import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -136,6 +138,20 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { FileOutputConfigurator.setSampler(CLASS, job, samplerConfig); } + /** + * Specifies a list of summarizer configurations to create summary data in the output file. Each Key Value written will be passed to the configured + * {@link Summarizer}'s. + * + * @param job + * The Hadoop job instance to be configured + * @param sumarizerConfigs + * summarizer configurations + * @since 2.0.0 + */ + public static void setSummarizers(JobConf job, SummarizerConfiguration... sumarizerConfigs) { + FileOutputConfigurator.setSummarizers(CLASS, job, sumarizerConfigs); + } + @Override public RecordWriter<Key,Value> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // get the path of the temporary output file http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java index 656dba7..dcdd42b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java @@ -23,6 +23,8 @@ import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -134,6 +136,20 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { FileOutputConfigurator.setSampler(CLASS, job.getConfiguration(), samplerConfig); } + /** + * Specifies a list of summarizer configurations to create summary data in the output file. Each Key Value written will be passed to the configured + * {@link Summarizer}'s. + * + * @param job + * The Hadoop job instance to be configured + * @param sumarizerConfigs + * summarizer configurations + * @since 2.0.0 + */ + public static void setSummarizers(Job job, SummarizerConfiguration... sumarizerConfigs) { + FileOutputConfigurator.setSummarizers(CLASS, job.getConfiguration(), sumarizerConfigs); + } + @Override public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException { // get the path of the temporary output file http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java index 049395f..5f73e90 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java @@ -22,6 +22,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; @@ -209,4 +210,12 @@ public class FileOutputConfigurator extends ConfiguratorBase { } } + public static void setSummarizers(Class<?> implementingClass, Configuration conf, SummarizerConfiguration[] sumarizerConfigs) { + Map<String,String> props = SummarizerConfiguration.toTableProperties(sumarizerConfigs); + + for (Entry<String,String> entry : props.entrySet()) { + conf.set(enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "." + entry.getKey(), entry.getValue()); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java index de89137..de486d7 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Predicate; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -41,10 +42,12 @@ import org.apache.accumulo.core.client.admin.DiskUsage; import org.apache.accumulo.core.client.admin.FindMax; import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.SummaryRetriever; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.impl.TableOperationsHelper; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -502,4 +505,28 @@ class MockTableOperations extends TableOperationsHelper { public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { throw new UnsupportedOperationException(); } + + @Override + public SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + throw new UnsupportedOperationException(); + } + + @Override + public void addSummarizers(String tableName, SummarizerConfiguration... summarizerConf) throws TableNotFoundException, AccumuloException, + AccumuloSecurityException { + throw new UnsupportedOperationException(); + + } + + @Override + public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException, + AccumuloSecurityException { + throw new UnsupportedOperationException(); + + } + + @Override + public List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index bc5995e..7c3f70e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -19,12 +19,18 @@ package org.apache.accumulo.core.client.rfile; import java.io.IOException; import java.io.OutputStream; +import java.util.Collection; import java.util.Map; import java.util.Map.Entry; +import java.util.function.Predicate; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.client.summary.Summary.FileStatistics; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; @@ -181,6 +187,119 @@ public class RFile { } /** + * This is an intermediate interface in a larger builder pattern. Supports setting the required input sources for reading summary data from an RFile. + * + * @since 2.0.0 + */ + public static interface SummaryInputArguments { + /** + * Specify RFiles to read from. When multiple inputs are specified the summary data will be merged. + * + * @param inputs + * one or more RFiles to read. + * @return this + */ + SummaryOptions from(RFileSource... inputs); + + /** + * Specify RFiles to read from. When multiple are specified the summary data will be merged. + * + * @param files + * one or more RFiles to read. + * @return this + */ + SummaryFSOptions from(String... files); + } + + /** + * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to read RFile summary data from. + * + * @since 2.0.0 + */ + public static interface SummaryFSOptions extends SummaryOptions { + /** + * Optionally provide a FileSystem to open RFiles. If not specified, the FileSystem will be constructed using configuration on the classpath. + * + * @param fs + * use this FileSystem to open files. + * @return this + */ + SummaryOptions withFileSystem(FileSystem fs); + } + + /** + * This is an intermediate interface in a large builder pattern. Allows setting options for retrieving summary data. + * + * @since 2.0.0 + */ + public static interface SummaryOptions { + /** + * This method allows retrieving a subset of summary data from a file. If a file has lots of separate summaries, reading a subset may be faster. + * + * @param summarySelector + * Only read summary data that was generated with configuration that this predicate matches. + * @return this + */ + SummaryOptions selectSummaries(Predicate<SummarizerConfiguration> summarySelector); + + /** + * Summary data may possibly be stored at a more granular level than the entire file. However there is no guarantee of this. If the data was stored at a + * more granular level, then this will get a subset of the summary data. The subset will very likely be an inaccurate approximation. + * + * @param startRow + * A non-null start row. The startRow is used exclusively. + * @return this + * + * @see FileStatistics#getExtra() + */ + SummaryOptions startRow(Text startRow); + + /** + * @param startRow + * UTF-8 encodes startRow. The startRow is used exclusively. + * @return this + * @see #startRow(Text) + */ + SummaryOptions startRow(CharSequence startRow); + + /** + * Summary data may possibly be stored at a more granular level than the entire file. However there is no guarantee of this. If the data was stored at a + * more granular level, then this will get a subset of the summary data. The subset will very likely be an inaccurate approximation. + * + * @param endRow + * A non-null end row. The end row is used inclusively. + * @return this + * + * @see FileStatistics#getExtra() + */ + SummaryOptions endRow(Text endRow); + + /** + * @param endRow + * UTF-8 encodes endRow. The end row is used inclusively. + * @return this + * @see #endRow(Text) + */ + SummaryOptions endRow(CharSequence endRow); + + /** + * Reads summary data from file. + * + * @return The summary data in the file that satisfied the selection criteria. + */ + Collection<Summary> read() throws IOException; + } + + /** + * Entry point for reading summary data from RFiles. + * + * @since 2.0.0 + */ + public static SummaryInputArguments summaries() { + return new RFileSummariesRetriever(); + } + + /** * This is an intermediate interface in a larger builder pattern. Supports setting the required output sink to write a RFile to. * * @since 1.8.0 @@ -224,6 +343,16 @@ public class RFile { * @since 1.8.0 */ public static interface WriterOptions { + + /** + * Enable generating summary data in the created RFile by running {@link Summarizer}'s based on the specified configuration. + * + * @param summarizerConf + * Configuration for summarizer to run. + * @since 2.0.0 + */ + public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf); + /** * An option to store sample data in the generated RFile. * http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java index 3a55172..cfd331a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java @@ -112,12 +112,14 @@ class RFileScannerBuilder implements RFile.InputArguments, RFile.ScannerFSOption @Override public ScannerOptions from(RFileSource... inputs) { + Objects.requireNonNull(inputs); opts.in = new InputArgs(inputs); return this; } @Override public ScannerFSOptions from(String... files) { + Objects.requireNonNull(files); opts.in = new InputArgs(files); return this; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java new file mode 100644 index 0000000..367172a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.rfile; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.function.Predicate; + +import org.apache.accumulo.core.client.rfile.RFile.SummaryFSOptions; +import org.apache.accumulo.core.client.rfile.RFile.SummaryInputArguments; +import org.apache.accumulo.core.client.rfile.RFile.SummaryOptions; +import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.summary.SummaryReader; +import org.apache.accumulo.core.summary.Gatherer; +import org.apache.accumulo.core.summary.SummarizerFactory; +import org.apache.accumulo.core.summary.SummaryCollection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; + +class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions, SummaryOptions { + + private Predicate<SummarizerConfiguration> summarySelector = sc -> true; + private Text startRow; + private InputArgs in; + private Text endRow; + + @Override + public SummaryOptions selectSummaries(Predicate<SummarizerConfiguration> summarySelector) { + Objects.requireNonNull(summarySelector); + this.summarySelector = summarySelector; + return this; + } + + @Override + public SummaryOptions startRow(CharSequence startRow) { + return startRow(new Text(startRow.toString())); + } + + @Override + public SummaryOptions startRow(Text startRow) { + Objects.requireNonNull(startRow); + this.startRow = startRow; + return this; + } + + @Override + public SummaryOptions endRow(CharSequence endRow) { + return endRow(new Text(endRow.toString())); + } + + @Override + public SummaryOptions endRow(Text endRow) { + Objects.requireNonNull(endRow); + this.endRow = endRow; + return this; + } + + @Override + public Collection<Summary> read() throws IOException { + SummarizerFactory factory = new SummarizerFactory(); + AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration(); + Configuration conf = in.getFileSystem().getConf(); + + RFileSource[] sources = in.getSources(); + try { + SummaryCollection all = new SummaryCollection(); + for (RFileSource source : in.getSources()) { + SummaryReader fileSummary = SummaryReader.load(conf, acuconf, source.getInputStream(), source.getLength(), summarySelector, factory); + SummaryCollection sc = fileSummary.getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow))); + all.merge(sc, factory); + } + + return all.getSummaries(); + } finally { + for (RFileSource source : sources) { + source.getInputStream().close(); + } + } + } + + @Override + public SummaryOptions withFileSystem(FileSystem fs) { + Objects.requireNonNull(fs); + this.in.fs = fs; + return this; + } + + @Override + public SummaryOptions from(RFileSource... inputs) { + Objects.requireNonNull(inputs); + in = new InputArgs(inputs); + return this; + } + + @Override + public SummaryFSOptions from(String... files) { + Objects.requireNonNull(files); + in = new InputArgs(files); + return this; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java index 9995888..9ae7fb0 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -210,6 +210,32 @@ public class RFileWriter implements AutoCloseable { } /** + * This method has the same behavior as {@link #append(Key, Value)}. + * + * @param key + * Same restrictions on key as {@link #append(Key, Value)}. + * @param value + * this parameter will be UTF-8 encoded. Must be non-null. + * @since 2.0.0 + */ + public void append(Key key, CharSequence value) throws IOException { + append(key, new Value(value)); + } + + /** + * This method has the same behavior as {@link #append(Key, Value)}. + * + * @param key + * Same restrictions on key as {@link #append(Key, Value)}. + * @param value + * Must be non-null. + * @since 2.0.0 + */ + public void append(Key key, byte[] value) throws IOException { + append(key, new Value(value)); + } + + /** * Append the keys and values to the last locality group that was started. * * @param keyValues http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index 667cbef..a7decb1 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -17,6 +17,8 @@ package org.apache.accumulo.core.client.rfile; +import static com.google.common.base.Preconditions.checkArgument; + import java.io.IOException; import java.io.OutputStream; import java.util.Collections; @@ -28,6 +30,7 @@ import java.util.Objects; import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions; import org.apache.accumulo.core.client.rfile.RFile.WriterOptions; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.file.FileOperations; @@ -59,15 +62,21 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions } private OutputArgs out; - private SamplerConfiguration sampler = null; private Map<String,String> tableConfig = Collections.emptyMap(); private int visCacheSize = 1000; + private Map<String,String> samplerProps = Collections.emptyMap(); + private Map<String,String> summarizerProps = Collections.emptyMap(); + + private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps, String kind) { + checkArgument(Collections.disjoint(props.keySet(), derivedProps.keySet()), "Properties and derived %s properties are not disjoint", kind); + } @Override public WriterOptions withSampler(SamplerConfiguration samplerConf) { Objects.requireNonNull(samplerConf); - SamplerConfigurationImpl.checkDisjoint(tableConfig, samplerConf); - this.sampler = samplerConf; + Map<String,String> tmp = new SamplerConfigurationImpl(samplerConf).toTablePropertiesMap(); + checkDisjoint(tableConfig, tmp, "sampler"); + this.samplerProps = tmp; return this; } @@ -76,10 +85,10 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions FileOperations fileops = FileOperations.getInstance(); AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration(); HashMap<String,String> userProps = new HashMap<>(); - if (sampler != null) { - userProps.putAll(new SamplerConfigurationImpl(sampler).toTablePropertiesMap()); - } + userProps.putAll(tableConfig); + userProps.putAll(summarizerProps); + userProps.putAll(samplerProps); if (userProps.size() > 0) { acuconf = new ConfigurationCopy(Iterables.concat(acuconf, userProps.entrySet())); @@ -92,10 +101,11 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions } else { fsdo = new FSDataOutputStream(out.getOutputStream(), new FileSystem.Statistics("foo")); } - return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf).build(), visCacheSize); + return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf) + .setAccumuloStartEnabled(false).build(), visCacheSize); } else { return new RFileWriter(fileops.newWriterBuilder().forFile(out.path.toString(), out.getFileSystem(), out.getConf()).withTableConfiguration(acuconf) - .build(), visCacheSize); + .setAccumuloStartEnabled(false).build(), visCacheSize); } } @@ -128,7 +138,8 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions cfg.put(entry.getKey(), entry.getValue()); } - SamplerConfigurationImpl.checkDisjoint(cfg, sampler); + checkDisjoint(cfg, samplerProps, "sampler"); + checkDisjoint(cfg, summarizerProps, "summarizer"); this.tableConfig = cfg; return this; } @@ -145,4 +156,13 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions this.visCacheSize = maxSize; return this; } + + @Override + public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) { + Objects.requireNonNull(summarizerConf); + Map<String,String> tmp = SummarizerConfiguration.toTableProperties(summarizerConf); + checkDisjoint(tableConfig, tmp, "summarizer"); + this.summarizerProps = tmp; + return this; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java b/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java new file mode 100644 index 0000000..a0f9bc5 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.summary; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This a convenience class for interpreting summary data generated by implementations of {@link CountingSummarizer} + * + * @since 2.0.0 + */ + +public class CounterSummary { + private Map<String,Long> stats; + + /** + * This method will call {@link #CounterSummary(Summary, boolean)} with true. + */ + public CounterSummary(Summary summary) { + this(summary, true); + } + + /** + * @param summary + * a summary + * @param checkType + * If true will try to ensure the classname from {@link Summary#getSummarizerConfiguration()} is an instance of {@link CountingSummarizer}. However + * this check can only succeed if the class is on the classpath. For cases where the summary data needs to be used and the class is not on the + * classpath, set this to false. + */ + public CounterSummary(Summary summary, boolean checkType) { + if (checkType) { + String className = summary.getSummarizerConfiguration().getClassName(); + try { + getClass().getClassLoader().loadClass(className).asSubclass(CountingSummarizer.class); + } catch (ClassCastException e) { + throw new IllegalArgumentException(className + " is not an instance of " + CountingSummarizer.class.getSimpleName(), e); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Unable to check summary was produced by a " + CountingSummarizer.class.getSimpleName(), e); + } + } + this.stats = summary.getStatistics(); + } + + @VisibleForTesting + CounterSummary(Map<String,Long> stats) { + this.stats = stats; + } + + /** + * @return statistic for {@link CountingSummarizer#SEEN_STAT} + */ + public long getSeen() { + return stats.getOrDefault(CountingSummarizer.SEEN_STAT, 0l); + } + + /** + * @return statistic for {@link CountingSummarizer#EMITTED_STAT} + */ + public long getEmitted() { + return stats.getOrDefault(CountingSummarizer.EMITTED_STAT, 0l); + } + + /** + * @return the sum of {@link #getTooLong()} and {@link #getTooLong()} + */ + public long getIgnored() { + return getTooLong() + getTooMany(); + } + + /** + * @return statistic for {@link CountingSummarizer#TOO_LONG_STAT} + */ + public long getTooLong() { + return stats.getOrDefault(CountingSummarizer.TOO_LONG_STAT, 0l); + } + + /** + * @return statistic for {@link CountingSummarizer#TOO_MANY_STAT} + */ + public long getTooMany() { + return stats.getOrDefault(CountingSummarizer.TOO_MANY_STAT, 0l); + } + + /** + * @return statistic for {@link CountingSummarizer#DELETES_IGNORED_STAT} + */ + public long getDeletesIgnored() { + return stats.getOrDefault(CountingSummarizer.DELETES_IGNORED_STAT, 0l); + } + + /** + * @return All statistics with a prefix of {@link CountingSummarizer#COUNTER_STAT_PREFIX} with the prefix stripped off. + */ + public Map<String,Long> getCounters() { + HashMap<String,Long> ret = new HashMap<>(); + for (Entry<String,Long> entry : stats.entrySet()) { + if (entry.getKey().startsWith(CountingSummarizer.COUNTER_STAT_PREFIX)) { + ret.put(entry.getKey().substring(CountingSummarizer.COUNTER_STAT_PREFIX.length()), entry.getValue()); + } + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java b/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java new file mode 100644 index 0000000..b3e1b68 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.summary; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.lang.mutable.MutableLong; + +//checkstyle and formatter are in conflict +//@formatter:off +/** + * This class counts arbitrary keys while defending against too many keys and keys that are too long. + * + * <p> + * During collection and summarization this class will use the functions from {@link #converter()} and {@link #encoder()}. For each key/value the function from + * {@link #converter()} will be called to create zero or more counter objects. A counter associated with each counter object will be incremented, as long as + * there are not too many counters and the counter object is not too long. + * + * <p> + * When {@link Summarizer.Collector#summarize(Summarizer.StatisticConsumer)} is called, the function from {@link #encoder()} will be used to convert counter + * objects to strings. These strings will be used to emit statistics. Overriding {@link #encoder()} is optional. One reason to override is if the counter object + * contains binary or special data. For example, a function that base64 encodes counter objects could be created. + * + * <p> + * If the counter key type is mutable, then consider overriding {@link #copier()}. + * + * <p> + * The function returned by {@link #converter()} will be called frequently and should be very efficient. The function returned by {@link #encoder()} will be + * called less frequently and can be more expensive. The reason these two functions exists is to avoid the conversion to string for each key value, if that + * conversion is unnecessary. + * + * <p> + * Below is an example implementation that counts column visibilities. This example avoids converting column visibility to string for each key/value. This + * example shows the source code for {@link VisibilitySummarizer}. + * + * <pre> + * <code> + * public class VisibilitySummarizer extends CountingSummarizer<ByteSequence> { + * @Override + * protected UnaryOperator<ByteSequence> copier() { + * // ByteSequences are mutable, so override and provide a copy function + * return ArrayByteSequence::new; + * } + * + * @Override + * protected Converter<ByteSequence> converter() { + * return (key, val, consumer) -> consumer.accept(key.getColumnVisibilityData()); + * } + * } + * </code> + * </pre> + * + * @param <K> + * The counter key type. This type must have good implementations of {@link Object#hashCode()} and {@link Object#equals(Object)}. + * @see CounterSummary + * @since 2.0.0 + */ +//@formatter:on +public abstract class CountingSummarizer<K> implements Summarizer { + + /** + * A configuration option for specifying the maximum number of unique counters an instance of this summarizer should track. If not specified, a default of + * {@value #MAX_COUNTER_DEFAULT} will be used. + */ + public static final String MAX_COUNTERS_OPT = "maxCounters"; + + /** + * A configuration option for specifying the maximum length of an individual counter key. If not specified, a default of {@value #MAX_CKL_DEFAULT} will be + * used. + */ + public static final String MAX_COUNTER_LEN_OPT = "maxCounterLen"; + + /** + * A configuration option to determine if delete keys should be counted. If set to true then delete keys will not be passed to the {@link Converter} and the + * statistic {@value #DELETES_IGNORED_STAT} will track the number of deleted ignored. This options defaults to {@value #INGNORE_DELETES_DEFAULT}. + */ + public static final String INGNORE_DELETES_OPT = "ignoreDeletes"; + + /** + * This prefixes all counters when emitting statistics in {@link Summarizer.Collector#summarize(Summarizer.StatisticConsumer)}. + */ + public static final String COUNTER_STAT_PREFIX = "c:"; + + /** + * This is the name of the statistic that tracks how many counters objects were ignored because the number of unique counters was exceeded. The max number of + * unique counters is specified by {@link #MAX_COUNTERS_OPT}. + */ + public static final String TOO_MANY_STAT = "tooMany"; + + /** + * This is the name of the statistic that tracks how many counter objects were ignored because they were too long. The maximum lenght is specified by + * {@link #MAX_COUNTER_LEN_OPT}. + */ + public static final String TOO_LONG_STAT = "tooLong"; + + /** + * This is the name of the statistic that tracks the total number of counter objects emitted by the {@link Converter}. This includes emitted Counter objects + * that were ignored. + */ + public static final String EMITTED_STAT = "emitted"; + + /** + * This is the name of the statistic that tracks the total number of deleted keys seen. This statistic is only incremented when the + * {@value #INGNORE_DELETES_OPT} option is set to true. + */ + public static final String DELETES_IGNORED_STAT = "deletesIgnored"; + + /** + * This tracks the total number of key/values seen by the {@link Summarizer.Collector} + */ + public static final String SEEN_STAT = "seen"; + + // this default can not be changed as persisted summary data depends on it. See the documentation about persistence in the Summarizer class javadoc. + public static final String MAX_COUNTER_DEFAULT = "1024"; + + // this default can not be changed as persisted summary data depends on it + public static final String MAX_CKL_DEFAULT = "128"; + + // this default can not be changed as persisted summary data depends on it + public static final String INGNORE_DELETES_DEFAULT = "true"; + + private static final String[] ALL_STATS = new String[] {TOO_LONG_STAT, TOO_MANY_STAT, EMITTED_STAT, SEEN_STAT, DELETES_IGNORED_STAT}; + + private int maxCounters; + private int maxCounterKeyLen; + private boolean ignoreDeletes; + + private void init(SummarizerConfiguration conf) { + maxCounters = Integer.parseInt(conf.getOptions().getOrDefault(MAX_COUNTERS_OPT, MAX_COUNTER_DEFAULT)); + maxCounterKeyLen = Integer.parseInt(conf.getOptions().getOrDefault(MAX_COUNTER_LEN_OPT, MAX_CKL_DEFAULT)); + ignoreDeletes = Boolean.parseBoolean(conf.getOptions().getOrDefault(INGNORE_DELETES_OPT, INGNORE_DELETES_DEFAULT)); + } + + /** + * A function that converts key values to zero or more counter objects. + * + * @since 2.0.0 + */ + public static interface Converter<K> { + /** + * @param consumer + * emit counter objects derived from key and value to this consumer + */ + public void convert(Key k, Value v, Consumer<K> consumer); + } + + /** + * + * @return A function that is used to convert each key value to zero or more counter objects. Each function returned should be independent. + */ + protected abstract Converter<K> converter(); + + /** + * @return A function that is used to convert counter objects to String. The default function calls {@link Object#toString()} on the counter object. + */ + protected Function<K,String> encoder() { + return Object::toString; + } + + /** + * Override this if your key type is mutable and subject to change. + * + * @return a function that used to copy the counter object. This function is only used when the collector has never seen the counter object before. In this + * case the collector needs to possibly copy the counter object before using as map key. The default implementation is the + * {@link UnaryOperator#identity()} function. + */ + protected UnaryOperator<K> copier() { + return UnaryOperator.identity(); + } + + @Override + public Collector collector(SummarizerConfiguration sc) { + init(sc); + return new Collector() { + + // Map used for computing summary incrementally uses ByteSequence for key which is more efficient than converting String for each Key. The + // conversion to String is deferred until the summary is requested. + + private Map<K,MutableLong> counters = new HashMap<>(); + private long tooMany = 0; + private long tooLong = 0; + private long seen = 0; + private long emitted = 0; + private long deleted = 0; + private Converter<K> converter = converter(); + private Function<K,String> encoder = encoder(); + private UnaryOperator<K> copier = copier(); + + private void incrementCounter(K counter) { + emitted++; + + MutableLong ml = counters.get(counter); + if (ml == null) { + if (counters.size() >= maxCounters) { + // no need to store this counter in the map and get() it... just use instance variable + tooMany++; + } else { + // we have never seen this key before, check if its too long + if (encoder.apply(counter).length() >= maxCounterKeyLen) { + tooLong++; + } else { + counters.put(copier.apply(counter), new MutableLong(1)); + } + } + } else { + // using mutable long allows calling put() to be avoided + ml.increment(); + } + } + + @Override + public void accept(Key k, Value v) { + seen++; + if (ignoreDeletes && k.isDeleted()) { + deleted++; + } else { + converter.convert(k, v, this::incrementCounter); + } + } + + @Override + public void summarize(StatisticConsumer sc) { + StringBuilder sb = new StringBuilder(COUNTER_STAT_PREFIX); + + for (Entry<K,MutableLong> entry : counters.entrySet()) { + sb.setLength(COUNTER_STAT_PREFIX.length()); + sb.append(encoder.apply(entry.getKey())); + sc.accept(sb.toString(), entry.getValue().longValue()); + } + + sc.accept(TOO_MANY_STAT, tooMany); + sc.accept(TOO_LONG_STAT, tooLong); + sc.accept(EMITTED_STAT, emitted); + sc.accept(SEEN_STAT, seen); + sc.accept(DELETES_IGNORED_STAT, deleted); + } + }; + } + + @Override + public Combiner combiner(SummarizerConfiguration sc) { + init(sc); + return new Combiner() { + + @Override + public void merge(Map<String,Long> summary1, Map<String,Long> summary2) { + + for (String key : ALL_STATS) { + summary1.merge(key, summary2.getOrDefault(key, 0l), Long::sum); + } + + for (Entry<String,Long> entry : summary2.entrySet()) { + String k2 = entry.getKey(); + Long v2 = entry.getValue(); + + if (k2.startsWith(COUNTER_STAT_PREFIX)) { + summary1.merge(k2, v2, Long::sum); + } + } + + if (summary1.size() - ALL_STATS.length > maxCounters) { + // find the keys with the lowest counts to remove + List<String> keysToRemove = summary1.entrySet().stream().filter(e -> e.getKey().startsWith(COUNTER_STAT_PREFIX)) // filter out non counters + .sorted((e1, e2) -> Long.compare(e2.getValue(), e1.getValue())) // sort descending by count + .skip(maxCounters) // skip most frequent + .map(e -> e.getKey()).collect(Collectors.toList()); // collect the least frequent counters in a list + + long removedCount = 0; + for (String key : keysToRemove) { + removedCount += summary1.remove(key); + } + + summary1.merge(TOO_MANY_STAT, removedCount, Long::sum); + } + } + }; + } +}