http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java new file mode 100644 index 0000000..cc688c9 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.summary; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.data.thrift.TSummaries; +import org.apache.accumulo.core.data.thrift.TSummarizerConfiguration; +import org.apache.accumulo.core.data.thrift.TSummary; + +import com.google.common.base.Preconditions; + +/** + * This class facilitates merging, storing, and serializing (to/from thrift) intermediate summary information. + */ +public class SummaryCollection { + + private static class MergedSummary { + Map<String,Long> summary; + long filesContaining; + long filesExceedingBoundry; + long filesLarge; + + public MergedSummary(FileSummary entry) { + this.summary = entry.summary; + this.filesContaining = 1; + this.filesExceedingBoundry = entry.exceededBoundry ? 1 : 0; + this.filesLarge = entry.exceededMaxSize ? 1 : 0; + } + + public MergedSummary(TSummary tSummary) { + this.summary = new HashMap<>(tSummary.getSummary()); + this.filesContaining = tSummary.getFilesContaining(); + this.filesExceedingBoundry = tSummary.getFilesExceeding(); + this.filesLarge = tSummary.getFilesLarge(); + } + + public void merge(MergedSummary other, SummarizerConfiguration config, SummarizerFactory factory) { + + if (summary == null && other.summary != null) { + summary = new HashMap<>(other.summary); + } else if (summary != null && other.summary != null) { + Summarizer summarizer = factory.getSummarizer(config); + summarizer.combiner(config).merge(summary, other.summary); + } + + filesContaining += other.filesContaining; + filesExceedingBoundry += other.filesExceedingBoundry; + filesLarge += other.filesLarge; + } + + public TSummary toThrift(SummarizerConfiguration key) { + TSummarizerConfiguration tsumConf = SummarizerConfigurationUtil.toThrift(key); + return new TSummary(summary, tsumConf, filesContaining, filesExceedingBoundry, filesLarge); + } + + } + + private Map<SummarizerConfiguration,MergedSummary> mergedSummaries; + private long totalFiles; + private long deletedFiles; + + public SummaryCollection() { + mergedSummaries = new HashMap<>(); + totalFiles = 0; + } + + public SummaryCollection(TSummaries tsums) { + mergedSummaries = new HashMap<>(); + for (TSummary tSummary : tsums.getSummaries()) { + SummarizerConfiguration sconf = SummarizerConfigurationUtil.fromThrift(tSummary.getConfig()); + mergedSummaries.put(sconf, new MergedSummary(tSummary)); + } + + totalFiles = tsums.getTotalFiles(); + deletedFiles = tsums.getDeletedFiles(); + } + + SummaryCollection(Collection<FileSummary> initialEntries) { + this(initialEntries, false); + } + + SummaryCollection(Collection<FileSummary> initialEntries, boolean deleted) { + if (deleted) { + Preconditions.checkArgument(initialEntries.size() == 0); + } + mergedSummaries = new HashMap<>(); + for (FileSummary entry : initialEntries) { + mergedSummaries.put(entry.conf, new MergedSummary(entry)); + } + totalFiles = 1; + this.deletedFiles = deleted ? 1 : 0; + } + + static class FileSummary { + + private SummarizerConfiguration conf; + private Map<String,Long> summary; + private boolean exceededBoundry; + private boolean exceededMaxSize; + + FileSummary(SummarizerConfiguration conf, Map<String,Long> summary, boolean exceededBoundry) { + this.conf = conf; + this.summary = summary; + this.exceededBoundry = exceededBoundry; + this.exceededMaxSize = false; + } + + FileSummary(SummarizerConfiguration conf) { + this.conf = conf; + this.summary = new HashMap<>(); + ; + this.exceededBoundry = false; + this.exceededMaxSize = true; + } + } + + public void merge(SummaryCollection other, SummarizerFactory factory) { + for (Entry<SummarizerConfiguration,MergedSummary> entry : other.mergedSummaries.entrySet()) { + MergedSummary ms = mergedSummaries.get(entry.getKey()); + if (ms == null) { + mergedSummaries.put(entry.getKey(), entry.getValue()); + } else { + ms.merge(entry.getValue(), entry.getKey(), factory); + } + } + + this.totalFiles += other.totalFiles; + this.deletedFiles += other.deletedFiles; + } + + public static SummaryCollection merge(SummaryCollection sc1, SummaryCollection sc2, SummarizerFactory factory) { + SummaryCollection ret = new SummaryCollection(); + ret.merge(sc1, factory); + ret.merge(sc2, factory); + return ret; + } + + public List<Summary> getSummaries() { + ArrayList<Summary> ret = new ArrayList<>(mergedSummaries.size()); + + for (Entry<SummarizerConfiguration,MergedSummary> entry : mergedSummaries.entrySet()) { + SummarizerConfiguration config = entry.getKey(); + MergedSummary ms = entry.getValue(); + + ret.add(new Summary(ms.summary, config, totalFiles, (totalFiles - deletedFiles) - ms.filesContaining, ms.filesExceedingBoundry, ms.filesLarge, + deletedFiles)); + } + + return ret; + } + + public long getTotalFiles() { + return totalFiles; + } + + public TSummaries toThrift() { + List<TSummary> summaries = new ArrayList<>(mergedSummaries.size()); + for (Entry<SummarizerConfiguration,MergedSummary> entry : mergedSummaries.entrySet()) { + summaries.add(entry.getValue().toThrift(entry.getKey())); + } + + return new TSummaries(true, -1l, totalFiles, deletedFiles, summaries); + } +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java new file mode 100644 index 0000000..7b9ebe4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.summary; + +import java.util.Map; + +import org.apache.hadoop.io.Text; + +class SummaryInfo { + + final Map<String,Long> summary; + final Text lastRow; + final int count; + + SummaryInfo(Text row, Map<String,Long> summary, int count) { + this.lastRow = row; + this.summary = summary; + this.count = count; + } + + SummaryInfo(byte[] row, Map<String,Long> summary, int count) { + this.lastRow = new Text(row); + this.summary = summary; + this.count = count; + } + + Text getLastRow() { + return lastRow; + } + + Map<String,Long> getSummary() { + return summary; + } + + int getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java new file mode 100644 index 0000000..9b2b5d9 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.summary; + +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; +import org.apache.accumulo.core.summary.Gatherer.RowRange; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.WritableUtils; + +public class SummaryReader { + + private static interface BlockReader { + DataInputStream getMetaBlock(String name) throws IOException; + } + + private static class CompositeCache implements BlockCache { + + private BlockCache summaryCache; + private BlockCache indexCache; + + CompositeCache(BlockCache summaryCache, BlockCache indexCache) { + this.summaryCache = summaryCache; + this.indexCache = indexCache; + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buf) { + return summaryCache.cacheBlock(blockName, buf); + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) { + return summaryCache.cacheBlock(blockName, buf, inMemory); + } + + @Override + public CacheEntry getBlock(String blockName) { + CacheEntry ce = summaryCache.getBlock(blockName); + if (ce == null) { + // Its possible the index cache may have this info, so check there. This is an opportunistic check. + ce = indexCache.getBlock(blockName); + } + return ce; + } + + @Override + public long getMaxSize() { + return summaryCache.getMaxSize(); + } + + @Override + public Stats getStats() { + return summaryCache.getStats(); + } + } + + private static List<SummarySerializer> load(BlockReader bcReader, Predicate<SummarizerConfiguration> summarySelector) throws IOException { + + try (DataInputStream in = bcReader.getMetaBlock(SummaryWriter.METASTORE_INDEX)) { + List<SummarySerializer> stores = new ArrayList<>(); + + readHeader(in); + int numSummaries = WritableUtils.readVInt(in); + for (int i = 0; i < numSummaries; i++) { + SummarizerConfiguration conf = readConfig(in); + boolean inline = in.readBoolean(); + if (inline) { + if (summarySelector.test(conf)) { + stores.add(SummarySerializer.load(conf, in)); + } else { + SummarySerializer.skip(in); + } + } else { + int block = WritableUtils.readVInt(in); + int offset = WritableUtils.readVInt(in); + if (summarySelector.test(conf)) { + try (DataInputStream summaryIn = bcReader.getMetaBlock(SummaryWriter.METASTORE_PREFIX + "." + block)) { + long skipped = in.skip(offset); + while (skipped < offset) { + skipped += in.skip(offset - skipped); + } + stores.add(SummarySerializer.load(conf, summaryIn)); + } catch (MetaBlockDoesNotExist e) { + // this is unexpected + throw new IOException(e); + } + } + } + } + + return stores; + } catch (MetaBlockDoesNotExist e) { + return Collections.emptyList(); + } + } + + private static SummaryReader load(CachableBlockFile.Reader bcReader, Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory) + throws IOException { + SummaryReader fileSummaries = new SummaryReader(); + fileSummaries.summaryStores = load(name -> bcReader.getMetaBlock(name), summarySelector); + fileSummaries.factory = factory; + return fileSummaries; + } + + public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf, InputStream inputStream, long length, + Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory) throws IOException { + org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new CachableBlockFile.Reader((InputStream & Seekable) inputStream, length, + conf, aConf); + return load(bcReader, summarySelector, factory); + } + + public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration aConf, SummarizerFactory factory, Path file, + Predicate<SummarizerConfiguration> summarySelector, BlockCache summaryCache, BlockCache indexCache) { + CachableBlockFile.Reader bcReader = null; + + try { + // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when only summary data is wanted. + CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache); + bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf); + return load(bcReader, summarySelector, factory); + } catch (FileNotFoundException fne) { + SummaryReader sr = new SummaryReader(); + sr.factory = factory; + sr.summaryStores = Collections.emptyList(); + sr.deleted = true; + return sr; + } catch (IOException e) { + try { + if (!fs.exists(file)) { + SummaryReader sr = new SummaryReader(); + sr.factory = factory; + sr.summaryStores = Collections.emptyList(); + sr.deleted = true; + return sr; + } + } catch (IOException e1) {} + throw new UncheckedIOException(e); + } finally { + if (bcReader != null) { + try { + bcReader.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + } + + private static void print(FileSKVIterator fsi, String indent, PrintStream out) throws IOException { + + out.printf("Summary data : \n"); + + List<SummarySerializer> stores = load(name -> fsi.getMetaStore(name), conf -> true); + int i = 1; + for (SummarySerializer summaryStore : stores) { + out.printf("%sSummary %d of %d generated by : %s\n", indent, i, stores.size(), summaryStore.getSummarizerConfiguration()); + i++; + summaryStore.print(indent, indent, out); + } + } + + public static void print(Reader iter, PrintStream out) throws IOException { + print(iter, " ", out); + } + + private static SummarizerConfiguration readConfig(DataInputStream in) throws IOException { + // read summarizer configuration + String summarizerClazz = in.readUTF(); + String configId = in.readUTF(); + org.apache.accumulo.core.client.summary.SummarizerConfiguration.Builder scb = SummarizerConfiguration.builder(summarizerClazz).setPropertyId(configId); + int numOpts = WritableUtils.readVInt(in); + for (int i = 0; i < numOpts; i++) { + String k = in.readUTF(); + String v = in.readUTF(); + scb.addOption(k, v); + } + + return scb.build(); + } + + private static byte readHeader(DataInputStream in) throws IOException { + long magic = in.readLong(); + if (magic != SummaryWriter.MAGIC) { + throw new IOException("Bad magic : " + String.format("%x", magic)); + } + + byte ver = in.readByte(); + if (ver != SummaryWriter.VER) { + throw new IOException("Unknown version : " + ver); + } + + return ver; + } + + private List<SummarySerializer> summaryStores; + + private SummarizerFactory factory; + + private boolean deleted; + + public SummaryCollection getSummaries(List<RowRange> ranges) { + + List<SummaryCollection.FileSummary> initial = new ArrayList<>(); + if (deleted) { + return new SummaryCollection(initial, true); + } + for (SummarySerializer summaryStore : summaryStores) { + if (summaryStore.exceededMaxSize()) { + initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration())); + } else { + Map<String,Long> summary = summaryStore.getSummary(ranges, factory); + boolean exceeded = summaryStore.exceedsRange(ranges); + initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration(), summary, exceeded)); + } + } + + return new SummaryCollection(initial); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java b/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java new file mode 100644 index 0000000..d76bd1a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.summary; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.Summarizer.Collector; +import org.apache.accumulo.core.client.summary.Summarizer.Combiner; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.summary.Gatherer.RowRange; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +/** + * This class supports serializing summaries and periodically storing summaries. The implementations attempts to generate around 10 summaries that are evenly + * spaced. This allows asking for summaries for sub-ranges of data in a rfile. + * + * <p> + * At first summaries are created for every 1000 keys values. After 10 summaries are added, the 10 summaries are merged to 5 and summaries are then created for + * every 2000 key values. The code keeps merging summaries and doubling the amount of key values per summary. This results in each summary covering about the + * same number of key values. + * + */ + +class SummarySerializer { + + private SummarizerConfiguration sconf; + private LgSummaries[] allSummaries; + + private SummarySerializer(SummarizerConfiguration sconf, LgSummaries[] allSummaries) { + this.sconf = sconf; + this.allSummaries = allSummaries; + } + + private SummarySerializer(SummarizerConfiguration sconf) { + this.sconf = sconf; + // this indicates max size was exceeded + this.allSummaries = null; + } + + public SummarizerConfiguration getSummarizerConfiguration() { + return sconf; + } + + public void print(String prefix, String indent, PrintStream out) { + + if (allSummaries == null) { + out.printf("%sSummary not stored because it was too large\n", prefix + indent); + } else { + for (LgSummaries lgs : allSummaries) { + lgs.print(prefix, indent, out); + } + } + } + + public Map<String,Long> getSummary(List<RowRange> ranges, SummarizerFactory sf) { + + Summarizer kvs = sf.getSummarizer(sconf); + + Map<String,Long> summary = new HashMap<>(); + for (LgSummaries lgs : allSummaries) { + lgs.getSummary(ranges, kvs.combiner(sconf), summary); + } + return summary; + } + + public boolean exceedsRange(List<RowRange> ranges) { + boolean er = false; + for (LgSummaries lgs : allSummaries) { + for (RowRange ke : ranges) { + er |= lgs.exceedsRange(ke.getStartRow(), ke.getEndRow()); + if (er) { + return er; + } + } + } + + return er; + } + + public boolean exceededMaxSize() { + return allSummaries == null; + } + + private static class SummaryStoreImpl implements org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer { + + HashMap<String,Long> summaries; + + @Override + public void accept(String summary, long value) { + summaries.put(summary, value); + } + } + + private static class LgBuilder { + private Summarizer summarizer; + private SummarizerConfiguration conf; + private Collector collector; + + private int maxSummaries = 10; + + private int cutoff = 1000; + private int count = 0; + + private List<SummaryInfo> summaries = new ArrayList<>(); + + private Key lastKey; + + private SummaryStoreImpl sci = new SummaryStoreImpl(); + + private String name; + + private boolean sawFirst = false; + private Text firstRow; + + private boolean finished = false; + + public LgBuilder(SummarizerConfiguration conf, Summarizer kvs) { + this.conf = conf; + this.summarizer = kvs; + this.name = "<DEFAULT>"; + this.collector = kvs.collector(conf); + } + + public LgBuilder(SummarizerConfiguration conf, Summarizer kvs, String name) { + this.conf = conf; + this.summarizer = kvs; + this.name = name; + this.collector = kvs.collector(conf); + } + + public void put(Key k, Value v) { + collector.accept(k, v); + count++; + + if (!sawFirst) { + firstRow = k.getRow(); + sawFirst = true; + + } + + if (count >= cutoff) { + sci.summaries = new HashMap<>(); + collector.summarize(sci); + collector = summarizer.collector(conf); + addSummary(k.getRow(), sci.summaries, count); + count = 0; + } + + lastKey = k; + } + + private List<SummaryInfo> merge(int end) { + List<SummaryInfo> mergedSummaries = new ArrayList<>(); + for (int i = 0; i < end; i += 2) { + int mergedCount = summaries.get(i).count + summaries.get(i + 1).count; + summarizer.combiner(conf).merge(summaries.get(i).summary, summaries.get(i + 1).summary); + mergedSummaries.add(new SummaryInfo(summaries.get(i + 1).getLastRow(), summaries.get(i).summary, mergedCount)); + } + return mergedSummaries; + } + + private void addSummary(Text row, Map<String,Long> summary, int count) { + Preconditions.checkState(!finished); + summaries.add(new SummaryInfo(row, summary, count)); + + if (summaries.size() % 2 == 0 && summaries.size() > maxSummaries) { + summaries = merge(summaries.size()); + cutoff *= 2; + } + } + + boolean collapse() { + Preconditions.checkState(finished); + if (summaries.size() <= 1) { + return false; + } + + int end = summaries.size(); + if (end % 2 == 1) { + end--; + } + + List<SummaryInfo> mergedSummaries = merge(end); + + if (summaries.size() % 2 == 1) { + mergedSummaries.add(summaries.get(summaries.size() - 1)); + } + + summaries = mergedSummaries; + + return true; + } + + void finish() { + Preconditions.checkState(!finished); + // summarize last data + if (count > 0) { + sci.summaries = new HashMap<>(); + collector.summarize(sci); + collector = null; + addSummary(lastKey.getRow(), sci.summaries, count); + count = 0; + finished = true; + } + } + + public void save(DataOutputStream dos, HashMap<String,Integer> symbolTable) throws IOException { + Preconditions.checkState(count == 0); + + dos.writeUTF(name); + + if (firstRow == null) { + WritableUtils.writeVInt(dos, 0); + } else { + firstRow.write(dos); + } + + // write summaries + WritableUtils.writeVInt(dos, summaries.size()); + for (SummaryInfo summaryInfo : summaries) { + summaryInfo.getLastRow().write(dos); + WritableUtils.writeVInt(dos, summaryInfo.count); + saveSummary(dos, symbolTable, summaryInfo.summary); + } + } + + private void saveSummary(DataOutputStream dos, HashMap<String,Integer> symbolTable, Map<String,Long> summary) throws IOException { + WritableUtils.writeVInt(dos, summary.size()); + for (Entry<String,Long> e : summary.entrySet()) { + WritableUtils.writeVInt(dos, symbolTable.get(e.getKey())); + WritableUtils.writeVLong(dos, e.getValue()); + } + } + } + + public static class Builder { + private Summarizer kvs; + + private SummarizerConfiguration conf; + + private List<LgBuilder> locGroups; + private LgBuilder lgb; + + private long maxSize; + + public Builder(SummarizerConfiguration conf, Summarizer kvs, long maxSize) { + this.conf = conf; + this.kvs = kvs; + this.locGroups = new ArrayList<>(); + this.maxSize = maxSize; + } + + public void put(Key k, Value v) { + lgb.put(k, v); + } + + public SummarizerConfiguration getSummarizerConfiguration() { + return conf; + } + + public void save(DataOutputStream dos) throws IOException { + + if (lgb != null) { + lgb.finish(); + locGroups.add(lgb); + } + + byte[] data = _save(); + + while (data.length > maxSize) { + boolean collapsedSome = false; + for (LgBuilder lgBuilder : locGroups) { + collapsedSome |= lgBuilder.collapse(); + } + + if (collapsedSome) { + data = _save(); + } else { + break; + } + } + + if (data.length > maxSize) { + dos.writeBoolean(true); + } else { + dos.writeBoolean(false); + // write this out to support efficient skipping + WritableUtils.writeVInt(dos, data.length); + dos.write(data); + } + } + + private byte[] _save() throws IOException { + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { + // create a symbol table + HashMap<String,Integer> symbolTable = new HashMap<>(); + ArrayList<String> symbols = new ArrayList<>(); + for (LgBuilder lg : locGroups) { + for (SummaryInfo si : lg.summaries) { + for (String symbol : si.summary.keySet()) { + if (!symbolTable.containsKey(symbol)) { + symbolTable.put(symbol, symbols.size()); + symbols.add(symbol); + } + } + } + } + + // write symbol table + WritableUtils.writeVInt(dos, symbols.size()); + for (String symbol : symbols) { + dos.writeUTF(symbol); + } + + WritableUtils.writeVInt(dos, locGroups.size()); + for (LgBuilder lg : locGroups) { + lg.save(dos, symbolTable); + } + + dos.close(); + return baos.toByteArray(); + } + } + + public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) { + if (lgb != null) { + lgb.finish(); + locGroups.add(lgb); + } + + lgb = new LgBuilder(conf, kvs, name); + } + + public void startDefaultLocalityGroup() { + if (lgb != null) { + lgb.finish(); + locGroups.add(lgb); + } + + lgb = new LgBuilder(conf, kvs); + } + } + + public static Builder builder(SummarizerConfiguration conf, SummarizerFactory factory, long maxSize) { + return new Builder(conf, factory.getSummarizer(conf), maxSize); + } + + static void skip(DataInputStream in) throws IOException { + boolean exceededMaxSize = in.readBoolean(); + if (!exceededMaxSize) { + long len = WritableUtils.readVInt(in); + long skipped = in.skip(len); + while (skipped < len) { + skipped += in.skip(len - skipped); + } + } + } + + static SummarySerializer load(SummarizerConfiguration sconf, DataInputStream in) throws IOException { + boolean exceededMaxSize = in.readBoolean(); + if (!exceededMaxSize) { + WritableUtils.readVInt(in); + // load symbol table + int numSymbols = WritableUtils.readVInt(in); + String[] symbols = new String[numSymbols]; + for (int i = 0; i < numSymbols; i++) { + symbols[i] = in.readUTF(); + } + + int numLGroups = WritableUtils.readVInt(in); + LgSummaries[] allSummaries = new LgSummaries[numLGroups]; + for (int i = 0; i < numLGroups; i++) { + allSummaries[i] = readLGroup(in, symbols); + } + + return new SummarySerializer(sconf, allSummaries); + } else { + return new SummarySerializer(sconf); + } + } + + private static class LgSummaries { + + private Text firstRow; + private SummaryInfo[] summaries; + private String lgroupName; + + LgSummaries(Text firstRow, SummaryInfo[] summaries, String lgroupName) { + this.firstRow = firstRow; + this.summaries = summaries; + this.lgroupName = lgroupName; + } + + boolean exceedsRange(Text startRow, Text endRow) { + + Text lastRow = summaries[summaries.length - 1].lastRow; + if (startRow != null && firstRow.compareTo(startRow) <= 0 && startRow.compareTo(lastRow) < 0) { + return true; + } + + if (endRow != null && endRow.compareTo(firstRow) >= 0 && lastRow.compareTo(endRow) > 0) { + return true; + } + + return false; + } + + void print(String prefix, String indent, PrintStream out) { + String p = prefix + indent; + out.printf("%sLocality group : %s\n", p, lgroupName); + p += indent; + for (SummaryInfo si : summaries) { + out.printf("%sSummary of %d key values (row of last key '%s') : \n", p, si.count, si.lastRow); + Set<Entry<String,Long>> es = si.summary.entrySet(); + String p2 = p + indent; + for (Entry<String,Long> entry : es) { + out.printf("%s%s = %s\n", p2, entry.getKey(), entry.getValue()); + } + } + } + + void getSummary(List<RowRange> ranges, Combiner combiner, Map<String,Long> summary) { + boolean[] summariesThatOverlap = new boolean[summaries.length]; + + for (RowRange keyExtent : ranges) { + Text startRow = keyExtent.getStartRow(); + Text endRow = keyExtent.getEndRow(); + + if (endRow != null && endRow.compareTo(firstRow) < 0) { + continue; + } + + int start = -1; + int end = summaries.length - 1; + + if (startRow == null) { + start = 0; + } else { + for (int i = 0; i < summaries.length; i++) { + if (startRow.compareTo(summaries[i].getLastRow()) < 0) { + start = i; + break; + } + } + } + + if (start == -1) { + continue; + } + + if (endRow == null) { + end = summaries.length - 1; + } else { + for (int i = start; i < summaries.length; i++) { + if (endRow.compareTo(summaries[i].getLastRow()) < 0) { + end = i; + break; + } + } + } + + for (int i = start; i <= end; i++) { + summariesThatOverlap[i] = true; + } + } + + for (int i = 0; i < summaries.length; i++) { + if (summariesThatOverlap[i]) { + combiner.merge(summary, summaries[i].summary); + } + } + } + } + + private static LgSummaries readLGroup(DataInputStream in, String[] symbols) throws IOException { + String lgroupName = in.readUTF(); + + // read first row + Text firstRow = new Text(); + firstRow.readFields(in); + + // read summaries + int numSummaries = WritableUtils.readVInt(in); + SummaryInfo[] summaries = new SummaryInfo[numSummaries]; + for (int i = 0; i < numSummaries; i++) { + int rowLen = WritableUtils.readVInt(in); + byte[] row = new byte[rowLen]; + in.readFully(row); + int count = WritableUtils.readVInt(in); + Map<String,Long> summary = readSummary(in, symbols); + summaries[i] = new SummaryInfo(row, summary, count); + } + + return new LgSummaries(firstRow, summaries, lgroupName); + } + + private static Map<String,Long> readSummary(DataInputStream in, String[] symbols) throws IOException { + com.google.common.collect.ImmutableMap.Builder<String,Long> imb = ImmutableMap.builder(); + int numEntries = WritableUtils.readVInt(in); + + for (int i = 0; i < numEntries; i++) { + String symbol = symbols[WritableUtils.readVInt(in)]; + imb.put(symbol, WritableUtils.readVLong(in)); + } + + return imb.build(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java new file mode 100644 index 0000000..1ebeeae --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.summary; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.hadoop.io.WritableUtils; + +public class SummaryWriter implements FileSKVWriter { + + static final String METASTORE_PREFIX = "accumulo.summaries"; + static final String METASTORE_INDEX = "accumulo.summaries.index"; + + // echo "accumulo summarize" | sha1sum | head -c 8 + static long MAGIC = 0x15ea283ec03e4c49L; + static byte VER = 1; + + private FileSKVWriter writer; + private SummarySerializer.Builder[] summaryStores; + + private SummaryWriter(FileSKVWriter writer, SummarizerFactory factory, List<SummarizerConfiguration> configs, long maxSize) { + this.writer = writer; + int i = 0; + summaryStores = new SummarySerializer.Builder[configs.size()]; + for (SummarizerConfiguration sconf : configs) { + summaryStores[i++] = SummarySerializer.builder(sconf, factory, maxSize); + } + } + + @Override + public boolean supportsLocalityGroups() { + return true; + } + + @Override + public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException { + for (SummarySerializer.Builder ssb : summaryStores) { + ssb.startNewLocalityGroup(name, columnFamilies); + } + + writer.startNewLocalityGroup(name, columnFamilies); + } + + @Override + public void startDefaultLocalityGroup() throws IOException { + for (SummarySerializer.Builder ssb : summaryStores) { + ssb.startDefaultLocalityGroup(); + } + writer.startDefaultLocalityGroup(); + } + + @Override + public void append(Key key, Value value) throws IOException { + writer.append(key, value); + for (SummarySerializer.Builder ssb : summaryStores) { + ssb.put(key, value); + } + } + + @Override + public DataOutputStream createMetaStore(String name) throws IOException { + return writer.createMetaStore(name); + } + + public void writeConfig(SummarizerConfiguration conf, DataOutputStream dos) throws IOException { + // save class (and its config) used to generate summaries + dos.writeUTF(conf.getClassName()); + dos.writeUTF(conf.getPropertyId()); + WritableUtils.writeVInt(dos, conf.getOptions().size()); + for (Entry<String,String> entry : conf.getOptions().entrySet()) { + dos.writeUTF(entry.getKey()); + dos.writeUTF(entry.getValue()); + } + } + + @Override + public void close() throws IOException { + + DataOutputStream out = writer.createMetaStore(METASTORE_INDEX); + out.writeLong(MAGIC); + out.write(VER); + WritableUtils.writeVInt(out, summaryStores.length); + + // Could possibly inline small summaries in the future. Breaking summaries into multiple block is better for caching a subset of summaries. Also, keeping + // the index small is good for the case where summaries that do not exist are requested. However multiple blocks cause more random I/O in the case when its + // not yet in the cache. + + for (int i = 0; i < summaryStores.length; i++) { + writeConfig(summaryStores[i].getSummarizerConfiguration(), out); + // write if summary is inlined in index... support for possible future optimizations. + out.writeBoolean(false); + // write pointer to block that will contain summary data + WritableUtils.writeVInt(out, i); + // write offset of summary data within block. This is not currently used, but it supports storing multiple summaries in an external block in the + // future without changing the code. + WritableUtils.writeVInt(out, 0); + } + out.close(); + + for (int i = 0; i < summaryStores.length; i++) { + DataOutputStream summaryOut = writer.createMetaStore(METASTORE_PREFIX + "." + i); + summaryStores[i].save(summaryOut); + summaryOut.close(); + } + + writer.close(); + } + + @Override + public long getLength() throws IOException { + return writer.getLength(); + } + + public static FileSKVWriter wrap(FileSKVWriter writer, AccumuloConfiguration tableConfig, boolean useAccumuloStart) { + List<SummarizerConfiguration> configs = SummarizerConfigurationUtil.getSummarizerConfigs(tableConfig); + + if (configs.size() == 0) { + return writer; + } + + SummarizerFactory factory; + if (useAccumuloStart) { + factory = new SummarizerFactory(tableConfig); + } else { + factory = new SummarizerFactory(); + } + + long maxSize = tableConfig.getMemoryInBytes(Property.TABLE_FILE_SUMMARY_MAX_SIZE); + return new SummaryWriter(writer, factory, configs, maxSize); + } +}