Repository: kylin Updated Branches: refs/heads/2.x-staging 35a5d87af -> e58326999
KYLIN-1233 Spill to disk when AggregationCache need too much memory Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/eaed4f6b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/eaed4f6b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/eaed4f6b Branch: refs/heads/2.x-staging Commit: eaed4f6bdf63385590b02360e0f5834478088db2 Parents: 35a5d87 Author: lidongsjtu <don...@ebay.com> Authored: Sun Dec 27 11:05:11 2015 +0800 Committer: lidongsjtu <don...@ebay.com> Committed: Sun Dec 27 11:05:11 2015 +0800 ---------------------------------------------------------------------- .../common/hll/HyperLogLogPlusCounter.java | 2 +- .../java/org/apache/kylin/common/util/Pair.java | 46 ++- .../cube/inmemcubing/InMemCubeBuilder.java | 2 +- .../org/apache/kylin/cube/util/KryoUtils.java | 2 +- .../kylin/gridtable/GTAggregateScanner.java | 406 +++++++++++++++---- .../apache/kylin/gridtable/GTScanRequest.java | 17 +- .../coprocessor/endpoint/CubeVisitService.java | 10 +- 7 files changed, 377 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java index ef91509..11ae78f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java +++ b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java @@ -42,7 +42,7 @@ public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLog private final int p; private final int m; - private final HashFunction hashFunc; + private transient final HashFunction hashFunc; byte[] registers; public HyperLogLogPlusCounter() { http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java index d28b05f..9e4e9ee 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java @@ -28,7 +28,7 @@ import java.io.Serializable; */ public class Pair<T1, T2> implements Serializable { private static final long serialVersionUID = -3986244606585552569L; - + protected T1 first = null; protected T2 second = null; @@ -60,6 +60,18 @@ public class Pair<T1, T2> implements Serializable { return new Pair<T1, T2>(a, b); } + private static boolean equals(Object x, Object y) { + return (x == null && y == null) || (x != null && x.equals(y)); + } + + /** + * Return the first element stored in the pair. + * @return T1 + */ + public T1 getFirst() { + return first; + } + /** * Replace the first element of the pair. * @param a operand @@ -68,20 +80,12 @@ public class Pair<T1, T2> implements Serializable { this.first = a; } - /** - * Replace the second element of the pair. - * @param b operand - */ - public void setSecond(T2 b) { - this.second = b; + public T1 getKey() { + return getFirst(); } - /** - * Return the first element stored in the pair. - * @return T1 - */ - public T1 getFirst() { - return first; + public void setKey(T1 a) { + setFirst(a); } /** @@ -92,8 +96,20 @@ public class Pair<T1, T2> implements Serializable { return second; } - private static boolean equals(Object x, Object y) { - return (x == null && y == null) || (x != null && x.equals(y)); + /** + * Replace the second element of the pair. + * @param b operand + */ + public void setSecond(T2 b) { + this.second = b; + } + + public T2 getValue() { + return getSecond(); + } + + public void setValue(T2 b) { + setSecond(b); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index 575583f..4bad818 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -330,7 +330,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount); GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null); - GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, true); + GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req); aggregationScanner.trackMemoryLevel(baseCuboidMemTracker); int count = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java index 9dbe0d2..48f925a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java @@ -48,7 +48,7 @@ public class KryoUtils { return deserialize(bytes, clazz); } - private static Kryo getKryo() { + public static Kryo getKryo() { if (_Kryo.get() == null) { Kryo kryo = new Kryo(); kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 658a08f..a760b92 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -1,26 +1,37 @@ package org.apache.kylin.gridtable; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; +import java.util.PriorityQueue; import java.util.SortedMap; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.util.KryoUtils; import org.apache.kylin.measure.MeasureAggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; @SuppressWarnings({ "rawtypes", "unchecked" }) public class GTAggregateScanner implements IGTScanner { - @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class); final GTInfo info; @@ -30,13 +41,13 @@ public class GTAggregateScanner implements IGTScanner { final String[] metricsAggrFuncs; final IGTScanner inputScanner; final AggregationCache aggrCache; - final boolean enableMemCheck; + final long spillThreshold; private int aggregatedRowCount = 0; private MemoryWaterLevel memTracker; - public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) { - if (req.hasAggregation() == false) + public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) { + if (!req.hasAggregation()) throw new IllegalStateException(); this.info = inputScanner.getInfo(); @@ -46,7 +57,31 @@ public class GTAggregateScanner implements IGTScanner { this.metricsAggrFuncs = req.getAggrMetricsFuncs(); this.inputScanner = inputScanner; this.aggrCache = new AggregationCache(); - this.enableMemCheck = enableMemCheck; + this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB); + } + + public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) { + // Aggregation cache is basically a tree map. The tree map entry overhead is + // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/ + // - 41~52 according to AggregationCacheMemSizeTest + return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size; + } + + public static long estimateSizeOf(MeasureAggregator[] aggrs) { + // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc.. + // Memory alignment to 8 bytes + long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */); + for (MeasureAggregator aggr : aggrs) { + if (aggr != null) + est += aggr.getMemBytesEstimate(); + } + return est; + } + + public static long estimateSizeOf(byte[] bytes) { + // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16] + // Memory alignment to 8 bytes + return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */); } public void trackMemoryLevel(MemoryWaterLevel tracker) { @@ -66,6 +101,7 @@ public class GTAggregateScanner implements IGTScanner { @Override public void close() throws IOException { inputScanner.close(); + aggrCache.close(); } @Override @@ -76,40 +112,51 @@ public class GTAggregateScanner implements IGTScanner { return aggrCache.iterator(); } + public int getNumOfSpills() { + return aggrCache.dumps.size(); + } + /** return the estimate memory size of aggregation cache */ public long getEstimateSizeOfAggrCache() { return aggrCache.estimatedMemSize(); } - class AggregationCache { - final SortedMap<byte[], MeasureAggregator[]> aggBufMap; + class AggregationCache implements Closeable { + final List<Dump> dumps; final int keyLength; final boolean[] compareMask; - public AggregationCache() { - compareMask = createCompareMask(); - keyLength = compareMask.length; - aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() { - @Override - public int compare(byte[] o1, byte[] o2) { - int result = 0; - // profiler shows this check is slow - // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length); - for (int i = 0; i < keyLength; ++i) { - if (compareMask[i]) { - int a = (o1[i] & 0xff); - int b = (o2[i] & 0xff); - result = a - b; - if (result == 0) { - continue; - } else { - return result; - } + final Kryo kryo = KryoUtils.getKryo(); + + final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() { + @Override + public int compare(byte[] o1, byte[] o2) { + int result = 0; + // profiler shows this check is slow + // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length); + for (int i = 0; i < keyLength; ++i) { + if (compareMask[i]) { + int a = (o1[i] & 0xff); + int b = (o2[i] & 0xff); + result = a - b; + if (result == 0) { + continue; + } else { + return result; } } - return result; } - }); + return result; + } + }; + + SortedMap<byte[], MeasureAggregator[]> aggBufMap; + + public AggregationCache() { + compareMask = createCompareMask(); + keyLength = compareMask.length; + dumps = Lists.newArrayList(); + aggBufMap = createBuffMap(); } private boolean[] createCompareMask() { @@ -133,6 +180,10 @@ public class GTAggregateScanner implements IGTScanner { return mask; } + private SortedMap<byte[], MeasureAggregator[]> createBuffMap() { + return Maps.newTreeMap(bytesComparator); + } + private byte[] createKey(GTRecord record) { byte[] result = new byte[keyLength]; int offset = 0; @@ -148,13 +199,15 @@ public class GTAggregateScanner implements IGTScanner { } void aggregate(GTRecord r) { - if (enableMemCheck && (++aggregatedRowCount % 1000 == 0)) { + if (++aggregatedRowCount % 1000 == 0) { if (memTracker != null) { memTracker.markHigh(); } - long estimated = estimatedMemSize(); - if (estimated > 10 * MemoryBudgetController.ONE_GB) { - throw new RuntimeException("AggregationCache exceed 10GB, estimated size is: " + estimated); + if (spillThreshold > 0) { + // spill to disk when aggBufMap used too large memory + if (estimatedMemSize() > spillThreshold) { + spillBuffMap(); + } } } @@ -171,6 +224,31 @@ public class GTAggregateScanner implements IGTScanner { } } + private void spillBuffMap() throws RuntimeException { + if (aggBufMap.isEmpty()) + return; + + try { + Dump dump = new Dump(aggBufMap); + dump.flush(); + dumps.add(dump); + aggBufMap = createBuffMap(); + } catch (Exception e) { + throw new RuntimeException("AggregationCache spill failed: " + e.getMessage()); + } + } + + @Override + public void close() throws RuntimeException { + try { + for (Dump dump : dumps) { + dump.terminate(); + } + } catch (Exception e) { + throw new RuntimeException("AggregationCache close failed: " + e.getMessage()); + } + } + private MeasureAggregator[] newAggregators() { return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs); } @@ -185,71 +263,233 @@ public class GTAggregateScanner implements IGTScanner { } public Iterator<GTRecord> iterator() { - return new Iterator<GTRecord>() { + // the all-in-mem case + if (dumps.isEmpty()) { + return new Iterator<GTRecord>() { + + final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator(); + final ReturningRecord returningRecord = new ReturningRecord(); + + @Override + public boolean hasNext() { + return it.hasNext(); + } - final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator(); + @Override + public GTRecord next() { + Entry<byte[], MeasureAggregator[]> entry = it.next(); + returningRecord.load(entry.getKey(), entry.getValue()); + return returningRecord.record; + } - final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); - final GTRecord secondRecord = new GTRecord(info); + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + // the spill case + else { + logger.info("Last spill, current AggregationCache memory estimated size is: " + getEstimateSizeOfAggrCache()); + this.spillBuffMap(); + + return new Iterator<GTRecord>() { + final DumpMerger merger = new DumpMerger(dumps); + final Iterator<Pair<byte[], MeasureAggregator[]>> it = merger.iterator(); + final ReturningRecord returningRecord = new ReturningRecord(); + + @Override + public boolean hasNext() { + return it.hasNext(); + } - @Override - public boolean hasNext() { - return it.hasNext(); + @Override + public GTRecord next() { + Pair<byte[], MeasureAggregator[]> entry = it.next(); + returningRecord.load(entry.getKey(), entry.getValue()); + return returningRecord.record; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + + class ReturningRecord { + final GTRecord record = new GTRecord(info); + final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); + + void load(byte[] key, MeasureAggregator[] value) { + int offset = 0; + for (int i = 0; i < dimensions.trueBitCount(); i++) { + int c = dimensions.trueBitAt(i); + final int columnLength = info.codeSystem.maxCodeLength(c); + record.cols[c].set(key, offset, columnLength); + offset += columnLength; + } + metricsBuf.clear(); + for (int i = 0; i < value.length; i++) { + int col = metrics.trueBitAt(i); + int pos = metricsBuf.position(); + info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf); + record.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos); } + } + } + + class Dump implements Iterable<Pair<byte[], MeasureAggregator[]>> { + File dumpedFile; + Input input; + SortedMap<byte[], MeasureAggregator[]> buffMap; + + public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap) throws IOException { + this.buffMap = buffMap; + } + + @Override + public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() { + try { + if (dumpedFile == null || !dumpedFile.exists()) { + throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath())); + } + + input = new Input(new FileInputStream(dumpedFile)); + + final int count = kryo.readObject(input, Integer.class); + return new Iterator<Pair<byte[], MeasureAggregator[]>>() { + int cursorIdx = 0; + + @Override + public boolean hasNext() { + return cursorIdx < count; + } + + @Override + public Pair<byte[], MeasureAggregator[]> next() { + try { + cursorIdx++; + return (Pair<byte[], MeasureAggregator[]>) kryo.readObject(input, Pair.class); + } catch (Exception e) { + throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage()); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } catch (Exception e) { + throw new RuntimeException("Failed to read dumped file: " + e.getMessage()); + } + } - @Override - public GTRecord next() { - Entry<byte[], MeasureAggregator[]> entry = it.next(); - create(entry.getKey(), entry.getValue()); - return secondRecord; + public void flush() throws IOException { + if (buffMap != null) { + Output output = null; + try { + dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp"); + + logger.info("AggregationCache will dump to file: " + dumpedFile.getAbsolutePath()); + output = new Output(new FileOutputStream(dumpedFile)); + kryo.writeObject(output, buffMap.size()); + for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) { + kryo.writeObject(output, new Pair(entry.getKey(), entry.getValue())); + } + } finally { + buffMap = null; + if (output != null) + output.close(); + } } + } - private void create(byte[] key, MeasureAggregator[] value) { - int offset = 0; - for (int i = 0; i < dimensions.trueBitCount(); i++) { - int c = dimensions.trueBitAt(i); - final int columnLength = info.codeSystem.maxCodeLength(c); - secondRecord.set(c, new ByteArray(key, offset, columnLength)); - offset += columnLength; + public void terminate() throws IOException { + buffMap = null; + if (input != null) + input.close(); + if (dumpedFile != null && dumpedFile.exists()) + dumpedFile.delete(); + } + } + + class DumpMerger implements Iterable<Pair<byte[], MeasureAggregator[]>> { + final PriorityQueue<Pair<byte[], Integer>> minHeap; + final List<Iterator<Pair<byte[], MeasureAggregator[]>>> dumpIterators; + final List<MeasureAggregator[]> dumpCurrentValues; + + public DumpMerger(List<Dump> dumps) { + minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Pair<byte[], Integer>>() { + @Override + public int compare(Pair<byte[], Integer> o1, Pair<byte[], Integer> o2) { + return bytesComparator.compare(o1.getFirst(), o2.getFirst()); } - metricsBuf.clear(); - for (int i = 0; i < value.length; i++) { - int col = metrics.trueBitAt(i); - int pos = metricsBuf.position(); - info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf); - secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos); + }); + dumpIterators = Lists.newArrayListWithCapacity(dumps.size()); + dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size()); + + Iterator<Pair<byte[], MeasureAggregator[]>> it; + for (int i = 0; i < dumps.size(); i++) { + it = dumps.get(i).iterator(); + if (it.hasNext()) { + dumpIterators.add(i, it); + Pair<byte[], MeasureAggregator[]> entry = it.next(); + minHeap.offer(new Pair(entry.getKey(), i)); + dumpCurrentValues.add(i, entry.getValue()); + } else { + dumpIterators.add(i, null); + dumpCurrentValues.add(i, null); } } + } - @Override - public void remove() { - throw new UnsupportedOperationException(); + private void enqueueFromDump(int index) { + if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext()) { + Pair<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next(); + minHeap.offer(new Pair(entry.getKey(), index)); + dumpCurrentValues.set(index, entry.getValue()); } - }; - } - } + } - public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) { - // Aggregation cache is basically a tree map. The tree map entry overhead is - // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/ - // - 41~52 according to AggregationCacheMemSizeTest - return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size; - } + @Override + public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() { + return new Iterator<Pair<byte[], MeasureAggregator[]>>() { + @Override + public boolean hasNext() { + return !minHeap.isEmpty(); + } - public static long estimateSizeOf(MeasureAggregator[] aggrs) { - // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc.. - // Memory alignment to 8 bytes - long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */); - for (MeasureAggregator aggr : aggrs) { - if (aggr != null) - est += aggr.getMemBytesEstimate(); - } - return est; - } + @Override + public Pair<byte[], MeasureAggregator[]> next() { + // Use minimum heap to merge sort the keys, + // also do aggregation for measures with same keys in different dumps + Pair<byte[], Integer> peekEntry = minHeap.poll(); + MeasureAggregator[] mergedAggr = dumpCurrentValues.get(peekEntry.getValue()); + enqueueFromDump(peekEntry.getValue()); - public static long estimateSizeOf(byte[] bytes) { - // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16] - // Memory alignment to 8 bytes - return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */); + while (!minHeap.isEmpty() && bytesComparator.compare(peekEntry.getKey(), minHeap.peek().getKey()) == 0) { + Pair<byte[], Integer> newPeek = minHeap.poll(); + + MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue()); + for (int i = 0; i < newPeekAggr.length; i++) { + mergedAggr[i].aggregate(newPeekAggr[i].getState()); + } + + enqueueFromDump(newPeek.getValue()); + } + + return new Pair(peekEntry.getKey(), mergedAggr); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 2c284c9..ac99d4e 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -29,6 +29,7 @@ public class GTScanRequest { // hint to storage behavior private boolean allowPreAggregation = true; + private double aggrCacheGB = 0; // no limit public GTScanRequest(GTInfo info) { this(info, null, null, null); @@ -118,7 +119,7 @@ public class GTScanRequest { } public IGTScanner decorateScanner(IGTScanner scanner) throws IOException { - return decorateScanner(scanner, true, true, false);//by default do not check mem + return decorateScanner(scanner, true, true);//by default do not check mem } /** @@ -127,7 +128,7 @@ public class GTScanRequest { * * Refer to CoprocessorBehavior for explanation */ - public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, boolean doMemCheck) throws IOException { + public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException { IGTScanner result = scanner; if (!doFilter) { //Skip reading this section if you're not profiling! lookAndForget(result); @@ -144,13 +145,13 @@ public class GTScanRequest { } if (this.allowPreAggregation && this.hasAggregation()) { - result = new GTAggregateScanner(result, this, doMemCheck); + result = new GTAggregateScanner(result, this); } return result; } } - //touch every byte of the cell so that the cost of scanning will be trully reflected + //touch every byte of the cell so that the cost of scanning will be truly reflected private void lookAndForget(IGTScanner scanner) { byte meaninglessByte = 0; for (GTRecord gtRecord : scanner) { @@ -215,6 +216,14 @@ public class GTScanRequest { return aggrMetricsFuncs; } + public double getAggrCacheGB() { + return aggrCacheGB; + } + + public void setAggrCacheGB(double gb) { + this.aggrCacheGB = gb; + } + @Override public String toString() { return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]"; http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 3759738..6feed33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -147,14 +147,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement innerScanner = region.getScanner(scan); InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner); + CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); + if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { + if (scanReq.getAggrCacheGB() <= 0) + scanReq.setAggrCacheGB(10); // 10 GB threshold, inherit from v1.0 + } + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize()); IGTScanner rawScanner = store.scan(scanReq); - CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); IGTScanner finalScanner = scanReq.decorateScanner(rawScanner,// behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(),// - behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(),// - behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal());// + behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()); ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);