Repository: kylin Updated Branches: refs/heads/2.x-staging 1410d9d30 -> db95d72ca
KYLIN-1245 Save 'mapper overlap ratio' in cube stats Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/db95d72c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/db95d72c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/db95d72c Branch: refs/heads/2.x-staging Commit: db95d72caa801151824177c613acc91c5549f478 Parents: 1410d9d Author: Li, Yang <yang...@ebay.com> Authored: Wed Dec 23 14:02:17 2015 +0800 Committer: Li, Yang <yang...@ebay.com> Committed: Wed Dec 23 14:03:25 2015 +0800 ---------------------------------------------------------------------- build/conf/kylin_hive_conf.xml | 2 +- .../org/apache/kylin/cube/CubeInstance.java | 10 - .../java/org/apache/kylin/cube/CubeManager.java | 6 +- .../org/apache/kylin/cube/CubeSegmentsTest.java | 2 +- .../kylin/engine/mr/common/CubeStatsReader.java | 227 +++++++++++++++++++ .../kylin/engine/mr/common/CuboidStatsUtil.java | 14 +- .../mr/steps/FactDistinctColumnsReducer.java | 19 +- .../kylin/engine/mr/steps/InMemCuboidJob.java | 153 +------------ .../apache/kylin/engine/spark/SparkCubing.java | 5 +- .../streaming/monitor/StreamingMonitor.java | 2 +- .../apache/kylin/rest/service/CubeService.java | 2 +- .../apache/kylin/rest/service/JobService.java | 2 +- .../storage/hbase/steps/CreateHTableJob.java | 3 +- 13 files changed, 267 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/build/conf/kylin_hive_conf.xml ---------------------------------------------------------------------- diff --git a/build/conf/kylin_hive_conf.xml b/build/conf/kylin_hive_conf.xml index afa53f7..f91f489 100644 --- a/build/conf/kylin_hive_conf.xml +++ b/build/conf/kylin_hive_conf.xml @@ -8,7 +8,7 @@ <property> <name>dfs.block.size</name> - <value>10485760</value> + <value>32000000</value> <description>Want more mappers for in-mem cubing, thus smaller the DFS block size</description> </property> http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index dccc3f1..8363a2b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -299,16 +299,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return result; } - public List<CubeSegment> getSegment(SegmentStatusEnum status) { - List<CubeSegment> result = Lists.newArrayList(); - for (CubeSegment segment : segments) { - if (segment.getStatus() == status) { - result.add(segment); - } - } - return result; - } - public CubeSegment getSegment(String name, SegmentStatusEnum status) { for (CubeSegment segment : segments) { if ((null != segment.getName() && segment.getName().equals(name)) && segment.getStatus() == status) { http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 88904bb..8e4906b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -271,7 +271,7 @@ public class CubeManager implements IRealizationProvider { } private boolean validateReadySegments(CubeInstance cube) { - final List<CubeSegment> readySegments = cube.getSegment(SegmentStatusEnum.READY); + final List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); if (readySegments.size() == 0) { return true; } @@ -475,7 +475,7 @@ public class CubeManager implements IRealizationProvider { } private Pair<Long, Long> alignMergeRange(CubeInstance cube, long startDate, long endDate) { - List<CubeSegment> readySegments = cube.getSegment(SegmentStatusEnum.READY); + List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); if (readySegments.isEmpty()) { throw new IllegalStateException("there are no segments in ready state"); } @@ -610,7 +610,7 @@ public class CubeManager implements IRealizationProvider { return null; } - List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegment(SegmentStatusEnum.READY)); + List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY)); if (readySegments.size() == 0) { logger.debug("Cube " + cube.getName() + " has no ready segment to merge"); http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java index 6d1d39b..c1f55d1 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java @@ -65,7 +65,7 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { // assert one ready segment assertEquals(1, cube.getSegments().size()); - CubeSegment seg = cube.getSegment(SegmentStatusEnum.READY).get(0); + CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0); assertEquals(SegmentStatusEnum.READY, seg.getStatus()); // append again, for non-partitioned cube, it becomes a full refresh http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java new file mode 100644 index 0000000..fc27a81 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.SequenceFile.Reader.Option; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.steps.InMemCuboidJob; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * This should be in cube module. It's here in engine-mr because currently stats + * are saved as sequence files thus a hadoop dependency. + */ +public class CubeStatsReader { + + private static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class); + + final CubeSegment seg; + final int samplingPercentage; + final double mapperOverlapRatioOfFirstBuild; + final Map<Long, HyperLogLogPlusCounter> cuboidRowCountMap; + + public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { + ResourceStore store = ResourceStore.getStore(kylinConfig); + String statsKey = cubeSegment.getStatisticsResourcePath(); + InputStream is = store.getResource(statsKey).inputStream; + Reader reader = null; + + try { + Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); + + Option streamInput = SequenceFile.Reader.stream(new FSDataInputStream(is)); + reader = new SequenceFile.Reader(hadoopConf, streamInput); + + int percentage = 100; + double mapperOverlapRatio = 0; + Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap(); + + LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); + BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf); + while (reader.next(key, value)) { + if (key.get() == 0L) { + percentage = Bytes.toInt(value.getBytes()); + } else if (key.get() == -1) { + mapperOverlapRatio = Bytes.toDouble(value.getBytes()); + } else { + HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14); + ByteArray byteArray = new ByteArray(value.getBytes()); + hll.readRegisters(byteArray.asBuffer()); + counterMap.put(key.get(), hll); + } + } + + this.seg = cubeSegment; + this.samplingPercentage = percentage; + this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio; + this.cuboidRowCountMap = counterMap; + + } finally { + IOUtils.closeStream(reader); + IOUtils.closeStream(is); + } + } + + public Map<Long, Long> getCuboidRowCountMap() { + return getCuboidRowCountMapFromSampling(cuboidRowCountMap, samplingPercentage); + } + + // return map of Cuboid ID => MB + public Map<Long, Double> getCuboidSizeMap() { + return getCuboidSizeMapFromRowCount(seg, getCuboidRowCountMap()); + } + + public void print(PrintWriter out) { + Map<Long, Long> rowCountMap = getCuboidRowCountMap(); + Map<Long, Double> sizeMap = getCuboidSizeMap(); + List<Long> cuboids = new ArrayList<Long>(rowCountMap.keySet()); + Collections.sort(cuboids); + + out.println("============================================================================"); + out.println("Statistics of " + seg); + out.println(" Sampling percentage: " + samplingPercentage); + out.println(" Mapper overlap ratio: " + mapperOverlapRatioOfFirstBuild); + for (Long cuboid : cuboids) { + out.println(" Cuboid :\t" + rowCountMap.get(cuboid) + " rows, " + sizeMap.get(cuboid) + " MB"); + } + out.println("----------------------------------------------------------------------------"); + } + + public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, HyperLogLogPlusCounter> hllcMap, int samplingPercentage) { + return Maps.transformValues(hllcMap, new Function<HyperLogLogPlusCounter, Long>() { + @Nullable + @Override + public Long apply(HyperLogLogPlusCounter input) { + // No need to adjust according sampling percentage. Assumption is that data set is far + // more than cardinality. Even a percentage of the data should already see all cardinalities. + return input.getCountEstimate(); + } + }); + } + + public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) { + final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); + final List<Integer> rowkeyColumnSize = Lists.newArrayList(); + final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + final List<TblColRef> columnList = baseCuboid.getColumns(); + + for (int i = 0; i < columnList.size(); i++) { + rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i))); + } + + Map<Long, Double> sizeMap = Maps.newHashMap(); + for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) { + sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize)); + } + return sizeMap; + } + + /** + * Estimate the cuboid's size + * + * @return the cuboid size in M bytes + */ + private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) { + + int bytesLength = cubeSegment.getRowKeyPreambleSize(); + + long mask = Long.highestOneBit(baseCuboidId); + long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId); + for (int i = 0; i < parentCuboidIdActualLength; i++) { + if ((mask & cuboidId) > 0) { + bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i)); + } + mask = mask >> 1; + } + + // add the measure length + int space = 0; + boolean isMemoryHungry = false; + for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { + if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) { + isMemoryHungry = true; + } + DataType returnType = measureDesc.getFunction().getReturnDataType(); + space += returnType.getStorageBytesEstimate(); + } + bytesLength += space; + + double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L); + if (isMemoryHungry) { + logger.info("Cube is memory hungry, storage size estimation multiply 0.05"); + ret *= 0.05; + } else { + logger.info("Cube is not memory hungry, storage size estimation multiply 0.25"); + ret *= 0.25; + } + logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M."); + return ret; + } + + public static void main(String[] args) throws IOException { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + CubeInstance cube = CubeManager.getInstance(config).getCube(args[0]); + List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY); + + PrintWriter out = new PrintWriter(System.out); + for (CubeSegment seg : segments) { + new CubeStatsReader(seg, config).print(out); + } + out.flush(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java index 3f46128..a1582cb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java @@ -37,7 +37,12 @@ import java.util.Map; public class CuboidStatsUtil { - public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException { + public static void writeCuboidStatistics(Configuration conf, Path outputPath, // + Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException { + writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0); + } + public static void writeCuboidStatistics(Configuration conf, Path outputPath, // + Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage, double mapperOverlapRatio) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION); List<Long> allCuboids = new ArrayList<Long>(); @@ -47,8 +52,12 @@ public class CuboidStatsUtil { ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); try { - // persist the sample percentage with key 0 + // mapper overlap ratio at key -1 + writer.append(new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio))); + + // sampling percentage at key 0 writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage))); + for (long i : allCuboids) { valueBuf.clear(); cuboidHLLMap.get(i).writeRegisters(valueBuf); @@ -59,4 +68,5 @@ public class CuboidStatsUtil { IOUtils.closeQuietly(writer); } } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 71adaa9..e4c8aff 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -49,7 +49,7 @@ import com.google.common.collect.Maps; */ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWritable, Text> { - private List<TblColRef> columnList = new ArrayList(); + private List<TblColRef> columnList = new ArrayList<TblColRef>(); private boolean collectStatistics = false; private String statisticsOutput = null; private List<Long> baseCuboidRowCountInMappers; @@ -57,7 +57,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri protected long baseCuboidId; protected CubeDesc cubeDesc; private long totalRowsBeforeMerge = 0; - private int SAMPING_PERCENTAGE = 100; + private int samplingPercentage = 100; private List<ByteArray> colValues; private long currentColIndex = -1; @@ -78,7 +78,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri if (collectStatistics) { baseCuboidRowCountInMappers = Lists.newArrayList(); cuboidHLLMap = Maps.newHashMap(); - SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100")); + samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100")); } colValues = Lists.newArrayList(); @@ -159,8 +159,15 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri //output the hll info; if (collectStatistics) { + long grandTotal = 0; + for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { + grandTotal += hll.getCountEstimate(); + } + double mapperOverlapRatio = (double) totalRowsBeforeMerge / grandTotal; + writeMapperAndCuboidStatistics(context); // for human check - CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob + CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // + cuboidHLLMap, samplingPercentage, mapperOverlapRatio); // for CreateHTableJob } } @@ -178,7 +185,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri msg = "Total cuboid number: \t" + allCuboids.size(); writeLine(out, msg); - msg = "Samping percentage: \t" + SAMPING_PERCENTAGE; + msg = "Samping percentage: \t" + samplingPercentage; writeLine(out, msg); writeLine(out, "The following statistics are collected based sampling data."); @@ -203,7 +210,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri writeLine(out, msg); if (grantTotal > 0) { - msg = "The compaction factor is: \t" + totalRowsBeforeMerge / grantTotal; + msg = "The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal; writeLine(out, msg); } http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 95eb725..053939e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -18,57 +18,31 @@ package org.apache.kylin.engine.mr.steps; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.util.List; import java.util.Map; -import javax.annotation.Nullable; - import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.ByteArrayWritable; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - /** */ public class InMemCuboidJob extends AbstractHadoopJob { @@ -144,10 +118,9 @@ public class InMemCuboidJob extends AbstractHadoopJob { } private int calculateReducerNum(CubeSegment cubeSeg) throws IOException { - Configuration jobConf = job.getConfiguration(); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Map<Long, Double> cubeSizeMap = getCubeSizeMapFromCuboidStatistics(cubeSeg, kylinConfig, jobConf); + Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, kylinConfig).getCuboidSizeMap(); double totalSizeInM = 0; for (Double cuboidSize : cubeSizeMap.values()) { totalSizeInM += cuboidSize; @@ -169,128 +142,6 @@ public class InMemCuboidJob extends AbstractHadoopJob { return numReduceTasks; } - public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException { - ResourceStore rs = ResourceStore.getStore(kylinConfig); - String fileKey = cubeSegment.getStatisticsResourcePath(); - InputStream is = rs.getResource(fileKey).inputStream; - File tempFile = null; - FileOutputStream tempFileStream = null; - try { - tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq"); - tempFileStream = new FileOutputStream(tempFile); - org.apache.commons.io.IOUtils.copy(is, tempFileStream); - } finally { - IOUtils.closeStream(is); - IOUtils.closeStream(tempFileStream); - } - Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap(); - - FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath()); - int samplingPercentage = 25; - SequenceFile.Reader reader = null; - try { - reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf); - LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); - BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf); - while (reader.next(key, value)) { - if (key.get() == 0L) { - samplingPercentage = Bytes.toInt(value.getBytes()); - } else { - HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14); - ByteArray byteArray = new ByteArray(value.getBytes()); - hll.readRegisters(byteArray.asBuffer()); - counterMap.put(key.get(), hll); - } - - } - } catch (Exception e) { - e.printStackTrace(); - throw e; - } finally { - IOUtils.closeStream(reader); - tempFile.delete(); - } - return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage); - } - - public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException { - Preconditions.checkArgument(samplingPercentage > 0); - return Maps.transformValues(counterMap, new Function<HyperLogLogPlusCounter, Long>() { - @Nullable - @Override - public Long apply(HyperLogLogPlusCounter input) { - return input.getCountEstimate() * 100 / samplingPercentage; - } - }); - } - - // return map of Cuboid ID => MB - public static Map<Long, Double> getCubeSizeMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException { - Map<Long, Long> rowCountMap = getCubeRowCountMapFromCuboidStatistics(cubeSegment, kylinConfig, conf); - Map<Long, Double> sizeMap = getCubeSizeMapFromRowCount(cubeSegment, rowCountMap); - return sizeMap; - } - - public static Map<Long, Double> getCubeSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) { - final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); - final List<Integer> rowkeyColumnSize = Lists.newArrayList(); - final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - final List<TblColRef> columnList = baseCuboid.getColumns(); - - for (int i = 0; i < columnList.size(); i++) { - rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i))); - } - - Map<Long, Double> sizeMap = Maps.newHashMap(); - for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) { - sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize)); - } - return sizeMap; - } - - /** - * Estimate the cuboid's size - * - * @return the cuboid size in M bytes - */ - private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) { - - int bytesLength = cubeSegment.getRowKeyPreambleSize(); - - long mask = Long.highestOneBit(baseCuboidId); - long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId); - for (int i = 0; i < parentCuboidIdActualLength; i++) { - if ((mask & cuboidId) > 0) { - bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i)); - } - mask = mask >> 1; - } - - // add the measure length - int space = 0; - boolean isMemoryHungry = false; - for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { - if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) { - isMemoryHungry = true; - } - DataType returnType = measureDesc.getFunction().getReturnDataType(); - space += returnType.getStorageBytesEstimate(); - } - bytesLength += space; - - double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L); - if (isMemoryHungry) { - logger.info("Cube is memory hungry, storage size estimation multiply 0.05"); - ret *= 0.05; - } else { - logger.info("Cube is not memory hungry, storage size estimation multiply 0.25"); - ret *= 0.25; - } - logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M."); - return ret; - } - public static void main(String[] args) throws Exception { InMemCuboidJob job = new InMemCuboidJob(); int exitCode = ToolRunner.run(job, args); http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 59a19d3..8737173 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -56,6 +56,7 @@ import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.*; import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.*; +import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.mr.steps.InMemCuboidJob; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; @@ -454,8 +455,8 @@ public class SparkCubing extends AbstractApplication { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - final Map<Long, Long> rowCountMap = InMemCuboidJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100); - final Map<Long, Double> cubeSizeMap = InMemCuboidJob.getCubeSizeMapFromRowCount(cubeSegment, rowCountMap); + final Map<Long, Long> rowCountMap = CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100); + final Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap); System.out.println("cube size estimation:" + cubeSizeMap); final byte[][] splitKeys = CreateHTableJob.getSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment); CubeHTableUtil.createHTable(cubeSegment, splitKeys); http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java index a6b8a9f..9609442 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java @@ -92,7 +92,7 @@ public class StreamingMonitor { private static List<CubeSegment> getSortedReadySegments(String cubeName) { final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); Preconditions.checkNotNull(cube); - final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY); + final List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY); logger.info("totally " + segments.size() + " cubeSegments"); Collections.sort(segments); return segments; http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/server/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java index 4f8e06f..b8e4cda 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -588,7 +588,7 @@ public class CubeService extends BasicService { if (desc.getRetentionRange() > 0) { synchronized (CubeService.class) { cube = getCubeManager().getCube(cubeName); - List<CubeSegment> readySegs = cube.getSegment(SegmentStatusEnum.READY); + List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY); long currentRange = 0; int position = readySegs.size() - 1; while (position >= 0) { http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/server/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java index a19e4f1..155593a 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -206,7 +206,7 @@ public class JobService extends BasicService { CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, forceMergeEmptySeg); job = EngineFactory.createBatchMergeJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.REFRESH) { - List<CubeSegment> readySegs = cube.getSegment(SegmentStatusEnum.READY); + List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY); boolean segExists = false; for (CubeSegment aSeg : readySegs) { if (aSeg.getDateRangeStart() == startDate && aSeg.getDateRangeEnd() == endDate) { http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 85c9200..023c69d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -46,6 +46,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.mr.common.CuboidShardUtil; import org.apache.kylin.engine.mr.steps.InMemCuboidJob; import org.apache.kylin.metadata.model.DataModelDesc; @@ -94,7 +95,7 @@ public class CreateHTableJob extends AbstractHadoopJob { try { byte[][] splitKeys; if (statsEnabled) { - final Map<Long, Double> cuboidSizeMap = InMemCuboidJob.getCubeSizeMapFromCuboidStatistics(cubeSegment, kylinConfig, conf); + final Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap(); splitKeys = getSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment); } else { splitKeys = getSplits(conf, partitionFilePath);