Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 1410d9d30 -> db95d72ca


KYLIN-1245 Save 'mapper overlap ratio' in cube stats


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/db95d72c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/db95d72c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/db95d72c

Branch: refs/heads/2.x-staging
Commit: db95d72caa801151824177c613acc91c5549f478
Parents: 1410d9d
Author: Li, Yang <yang...@ebay.com>
Authored: Wed Dec 23 14:02:17 2015 +0800
Committer: Li, Yang <yang...@ebay.com>
Committed: Wed Dec 23 14:03:25 2015 +0800

----------------------------------------------------------------------
 build/conf/kylin_hive_conf.xml                  |   2 +-
 .../org/apache/kylin/cube/CubeInstance.java     |  10 -
 .../java/org/apache/kylin/cube/CubeManager.java |   6 +-
 .../org/apache/kylin/cube/CubeSegmentsTest.java |   2 +-
 .../kylin/engine/mr/common/CubeStatsReader.java | 227 +++++++++++++++++++
 .../kylin/engine/mr/common/CuboidStatsUtil.java |  14 +-
 .../mr/steps/FactDistinctColumnsReducer.java    |  19 +-
 .../kylin/engine/mr/steps/InMemCuboidJob.java   | 153 +------------
 .../apache/kylin/engine/spark/SparkCubing.java  |   5 +-
 .../streaming/monitor/StreamingMonitor.java     |   2 +-
 .../apache/kylin/rest/service/CubeService.java  |   2 +-
 .../apache/kylin/rest/service/JobService.java   |   2 +-
 .../storage/hbase/steps/CreateHTableJob.java    |   3 +-
 13 files changed, 267 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/build/conf/kylin_hive_conf.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_hive_conf.xml b/build/conf/kylin_hive_conf.xml
index afa53f7..f91f489 100644
--- a/build/conf/kylin_hive_conf.xml
+++ b/build/conf/kylin_hive_conf.xml
@@ -8,7 +8,7 @@
 
     <property>
         <name>dfs.block.size</name>
-        <value>10485760</value>
+        <value>32000000</value>
         <description>Want more mappers for in-mem cubing, thus smaller the DFS 
block size</description>
     </property>
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index dccc3f1..8363a2b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -299,16 +299,6 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
         return result;
     }
 
-    public List<CubeSegment> getSegment(SegmentStatusEnum status) {
-        List<CubeSegment> result = Lists.newArrayList();
-        for (CubeSegment segment : segments) {
-            if (segment.getStatus() == status) {
-                result.add(segment);
-            }
-        }
-        return result;
-    }
-
     public CubeSegment getSegment(String name, SegmentStatusEnum status) {
         for (CubeSegment segment : segments) {
             if ((null != segment.getName() && segment.getName().equals(name)) 
&& segment.getStatus() == status) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 88904bb..8e4906b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -271,7 +271,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     private boolean validateReadySegments(CubeInstance cube) {
-        final List<CubeSegment> readySegments = 
cube.getSegment(SegmentStatusEnum.READY);
+        final List<CubeSegment> readySegments = 
cube.getSegments(SegmentStatusEnum.READY);
         if (readySegments.size() == 0) {
             return true;
         }
@@ -475,7 +475,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     private Pair<Long, Long> alignMergeRange(CubeInstance cube, long 
startDate, long endDate) {
-        List<CubeSegment> readySegments = 
cube.getSegment(SegmentStatusEnum.READY);
+        List<CubeSegment> readySegments = 
cube.getSegments(SegmentStatusEnum.READY);
         if (readySegments.isEmpty()) {
             throw new IllegalStateException("there are no segments in ready 
state");
         }
@@ -610,7 +610,7 @@ public class CubeManager implements IRealizationProvider {
             return null;
         }
 
-        List<CubeSegment> readySegments = 
Lists.newArrayList(cube.getSegment(SegmentStatusEnum.READY));
+        List<CubeSegment> readySegments = 
Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY));
 
         if (readySegments.size() == 0) {
             logger.debug("Cube " + cube.getName() + " has no ready segment to 
merge");

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index 6d1d39b..c1f55d1 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -65,7 +65,7 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
 
         // assert one ready segment
         assertEquals(1, cube.getSegments().size());
-        CubeSegment seg = cube.getSegment(SegmentStatusEnum.READY).get(0);
+        CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0);
         assertEquals(SegmentStatusEnum.READY, seg.getStatus());
 
         // append again, for non-partitioned cube, it becomes a full refresh

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
new file mode 100644
index 0000000..fc27a81
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Reader.Option;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * This should be in cube module. It's here in engine-mr because currently 
stats
+ * are saved as sequence files thus a hadoop dependency.
+ */
+public class CubeStatsReader {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(InMemCuboidJob.class);
+
+    final CubeSegment seg;
+    final int samplingPercentage;
+    final double mapperOverlapRatioOfFirstBuild;
+    final Map<Long, HyperLogLogPlusCounter> cuboidRowCountMap;
+
+    public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) 
throws IOException {
+        ResourceStore store = ResourceStore.getStore(kylinConfig);
+        String statsKey = cubeSegment.getStatisticsResourcePath();
+        InputStream is = store.getResource(statsKey).inputStream;
+        Reader reader = null;
+
+        try {
+            Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
+
+            Option streamInput = SequenceFile.Reader.stream(new 
FSDataInputStream(is));
+            reader = new SequenceFile.Reader(hadoopConf, streamInput);
+
+            int percentage = 100;
+            double mapperOverlapRatio = 0;
+            Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
+
+            LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
+            BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
+            while (reader.next(key, value)) {
+                if (key.get() == 0L) {
+                    percentage = Bytes.toInt(value.getBytes());
+                } else if (key.get() == -1) {
+                    mapperOverlapRatio = Bytes.toDouble(value.getBytes());
+                } else {
+                    HyperLogLogPlusCounter hll = new 
HyperLogLogPlusCounter(14);
+                    ByteArray byteArray = new ByteArray(value.getBytes());
+                    hll.readRegisters(byteArray.asBuffer());
+                    counterMap.put(key.get(), hll);
+                }
+            }
+
+            this.seg = cubeSegment;
+            this.samplingPercentage = percentage;
+            this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio;
+            this.cuboidRowCountMap = counterMap;
+
+        } finally {
+            IOUtils.closeStream(reader);
+            IOUtils.closeStream(is);
+        }
+    }
+
+    public Map<Long, Long> getCuboidRowCountMap() {
+        return getCuboidRowCountMapFromSampling(cuboidRowCountMap, 
samplingPercentage);
+    }
+
+    // return map of Cuboid ID => MB
+    public Map<Long, Double> getCuboidSizeMap() {
+        return getCuboidSizeMapFromRowCount(seg, getCuboidRowCountMap());
+    }
+
+    public void print(PrintWriter out) {
+        Map<Long, Long> rowCountMap = getCuboidRowCountMap();
+        Map<Long, Double> sizeMap = getCuboidSizeMap();
+        List<Long> cuboids = new ArrayList<Long>(rowCountMap.keySet());
+        Collections.sort(cuboids);
+
+        
out.println("============================================================================");
+        out.println("Statistics of " + seg);
+        out.println("  Sampling percentage:  " + samplingPercentage);
+        out.println("  Mapper overlap ratio: " + 
mapperOverlapRatioOfFirstBuild);
+        for (Long cuboid : cuboids) {
+            out.println("  Cuboid :\t" + rowCountMap.get(cuboid) + " rows, " + 
sizeMap.get(cuboid) + " MB");
+        }
+        
out.println("----------------------------------------------------------------------------");
+    }
+
+    public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, 
HyperLogLogPlusCounter> hllcMap, int samplingPercentage) {
+        return Maps.transformValues(hllcMap, new 
Function<HyperLogLogPlusCounter, Long>() {
+            @Nullable
+            @Override
+            public Long apply(HyperLogLogPlusCounter input) {
+                // No need to adjust according sampling percentage. Assumption 
is that data set is far
+                // more than cardinality. Even a percentage of the data should 
already see all cardinalities.
+                return input.getCountEstimate();
+            }
+        });
+    }
+
+    public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment 
cubeSegment, Map<Long, Long> rowCountMap) {
+        final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        final List<Integer> rowkeyColumnSize = Lists.newArrayList();
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+        final List<TblColRef> columnList = baseCuboid.getColumns();
+
+        for (int i = 0; i < columnList.size(); i++) {
+            
rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i)));
+        }
+
+        Map<Long, Double> sizeMap = Maps.newHashMap();
+        for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) {
+            sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, 
entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize));
+        }
+        return sizeMap;
+    }
+
+    /**
+     * Estimate the cuboid's size
+     *
+     * @return the cuboid size in M bytes
+     */
+    private static double estimateCuboidStorageSize(CubeSegment cubeSegment, 
long cuboidId, long rowCount, long baseCuboidId, List<Integer> 
rowKeyColumnLength) {
+
+        int bytesLength = cubeSegment.getRowKeyPreambleSize();
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        long parentCuboidIdActualLength = Long.SIZE - 
Long.numberOfLeadingZeros(baseCuboidId);
+        for (int i = 0; i < parentCuboidIdActualLength; i++) {
+            if ((mask & cuboidId) > 0) {
+                bytesLength += rowKeyColumnLength.get(i); 
//colIO.getColumnLength(columnList.get(i));
+            }
+            mask = mask >> 1;
+        }
+
+        // add the measure length
+        int space = 0;
+        boolean isMemoryHungry = false;
+        for (MeasureDesc measureDesc : 
cubeSegment.getCubeDesc().getMeasures()) {
+            if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) {
+                isMemoryHungry = true;
+            }
+            DataType returnType = 
measureDesc.getFunction().getReturnDataType();
+            space += returnType.getStorageBytesEstimate();
+        }
+        bytesLength += space;
+
+        double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L);
+        if (isMemoryHungry) {
+            logger.info("Cube is memory hungry, storage size estimation 
multiply 0.05");
+            ret *= 0.05;
+        } else {
+            logger.info("Cube is not memory hungry, storage size estimation 
multiply 0.25");
+            ret *= 0.25;
+        }
+        logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each 
row size is " + bytesLength + " bytes." + " Total size is " + ret + "M.");
+        return ret;
+    }
+    
+    public static void main(String[] args) throws IOException {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        CubeInstance cube = CubeManager.getInstance(config).getCube(args[0]);
+        List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY);
+        
+        PrintWriter out = new PrintWriter(System.out);
+        for (CubeSegment seg : segments) {
+            new CubeStatsReader(seg, config).print(out);
+        }
+        out.flush();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
index 3f46128..a1582cb 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
@@ -37,7 +37,12 @@ import java.util.Map;
 
 public class CuboidStatsUtil {
 
-    public static void writeCuboidStatistics(Configuration conf, Path 
outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int 
samplingPercentage) throws IOException {
+    public static void writeCuboidStatistics(Configuration conf, Path 
outputPath, //
+            Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int 
samplingPercentage) throws IOException {
+        writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 
samplingPercentage, 0);
+    }
+    public static void writeCuboidStatistics(Configuration conf, Path 
outputPath, //
+            Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int 
samplingPercentage, double mapperOverlapRatio) throws IOException {
         Path seqFilePath = new Path(outputPath, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
 
         List<Long> allCuboids = new ArrayList<Long>();
@@ -47,8 +52,12 @@ public class CuboidStatsUtil {
         ByteBuffer valueBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
         SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
SequenceFile.Writer.file(seqFilePath), 
SequenceFile.Writer.keyClass(LongWritable.class), 
SequenceFile.Writer.valueClass(BytesWritable.class));
         try {
-            // persist the sample percentage with key 0
+            // mapper overlap ratio at key -1
+            writer.append(new LongWritable(-1), new 
BytesWritable(Bytes.toBytes(mapperOverlapRatio)));
+            
+            // sampling percentage at key 0
             writer.append(new LongWritable(0l), new 
BytesWritable(Bytes.toBytes(samplingPercentage)));
+            
             for (long i : allCuboids) {
                 valueBuf.clear();
                 cuboidHLLMap.get(i).writeRegisters(valueBuf);
@@ -59,4 +68,5 @@ public class CuboidStatsUtil {
             IOUtils.closeQuietly(writer);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 71adaa9..e4c8aff 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -49,7 +49,7 @@ import com.google.common.collect.Maps;
  */
 public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, 
NullWritable, Text> {
 
-    private List<TblColRef> columnList = new ArrayList();
+    private List<TblColRef> columnList = new ArrayList<TblColRef>();
     private boolean collectStatistics = false;
     private String statisticsOutput = null;
     private List<Long> baseCuboidRowCountInMappers;
@@ -57,7 +57,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<Text, Text, NullWri
     protected long baseCuboidId;
     protected CubeDesc cubeDesc;
     private long totalRowsBeforeMerge = 0;
-    private int SAMPING_PERCENTAGE = 100;
+    private int samplingPercentage = 100;
     private List<ByteArray> colValues;
     private long currentColIndex = -1;
 
@@ -78,7 +78,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<Text, Text, NullWri
         if (collectStatistics) {
             baseCuboidRowCountInMappers = Lists.newArrayList();
             cuboidHLLMap = Maps.newHashMap();
-            SAMPING_PERCENTAGE = 
Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT,
 "100"));
+            samplingPercentage = 
Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT,
 "100"));
         }
 
         colValues = Lists.newArrayList();
@@ -159,8 +159,15 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<Text, Text, NullWri
         
         //output the hll info;
         if (collectStatistics) {
+            long grandTotal = 0;
+            for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
+                grandTotal += hll.getCountEstimate();
+            }
+            double mapperOverlapRatio = (double) totalRowsBeforeMerge / 
grandTotal;
+            
             writeMapperAndCuboidStatistics(context); // for human check
-            CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), 
new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for 
CreateHTableJob
+            CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), 
new Path(statisticsOutput), //
+                    cuboidHLLMap, samplingPercentage, mapperOverlapRatio); // 
for CreateHTableJob
         }
     }
 
@@ -178,7 +185,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<Text, Text, NullWri
 
             msg = "Total cuboid number: \t" + allCuboids.size();
             writeLine(out, msg);
-            msg = "Samping percentage: \t" + SAMPING_PERCENTAGE;
+            msg = "Samping percentage: \t" + samplingPercentage;
             writeLine(out, msg);
 
             writeLine(out, "The following statistics are collected based 
sampling data.");
@@ -203,7 +210,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<Text, Text, NullWri
             writeLine(out, msg);
 
             if (grantTotal > 0) {
-                msg = "The compaction factor is: \t" + totalRowsBeforeMerge / 
grantTotal;
+                msg = "The mapper overlap ratio is: \t" + totalRowsBeforeMerge 
/ grantTotal;
                 writeLine(out, msg);
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 95eb725..053939e 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -18,57 +18,31 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 /**
  */
 public class InMemCuboidJob extends AbstractHadoopJob {
@@ -144,10 +118,9 @@ public class InMemCuboidJob extends AbstractHadoopJob {
     }
 
     private int calculateReducerNum(CubeSegment cubeSeg) throws IOException {
-        Configuration jobConf = job.getConfiguration();
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         
-        Map<Long, Double> cubeSizeMap = 
getCubeSizeMapFromCuboidStatistics(cubeSeg, kylinConfig, jobConf);
+        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, 
kylinConfig).getCuboidSizeMap();
         double totalSizeInM = 0;
         for (Double cuboidSize : cubeSizeMap.values()) {
             totalSizeInM += cuboidSize;
@@ -169,128 +142,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
         return numReduceTasks;
     }
 
-    public static Map<Long, Long> 
getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig 
kylinConfig, Configuration conf) throws IOException {
-        ResourceStore rs = ResourceStore.getStore(kylinConfig);
-        String fileKey = cubeSegment.getStatisticsResourcePath();
-        InputStream is = rs.getResource(fileKey).inputStream;
-        File tempFile = null;
-        FileOutputStream tempFileStream = null;
-        try {
-            tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
-            tempFileStream = new FileOutputStream(tempFile);
-            org.apache.commons.io.IOUtils.copy(is, tempFileStream);
-        } finally {
-            IOUtils.closeStream(is);
-            IOUtils.closeStream(tempFileStream);
-        }
-        Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
-
-        FileSystem fs = HadoopUtil.getFileSystem("file:///" + 
tempFile.getAbsolutePath());
-        int samplingPercentage = 25;
-        SequenceFile.Reader reader = null;
-        try {
-            reader = new SequenceFile.Reader(fs, new 
Path(tempFile.getAbsolutePath()), conf);
-            LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-            BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
-            while (reader.next(key, value)) {
-                if (key.get() == 0L) {
-                    samplingPercentage = Bytes.toInt(value.getBytes());
-                } else {
-                    HyperLogLogPlusCounter hll = new 
HyperLogLogPlusCounter(14);
-                    ByteArray byteArray = new ByteArray(value.getBytes());
-                    hll.readRegisters(byteArray.asBuffer());
-                    counterMap.put(key.get(), hll);
-                }
-
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        } finally {
-            IOUtils.closeStream(reader);
-            tempFile.delete();
-        }
-        return getCubeRowCountMapFromCuboidStatistics(counterMap, 
samplingPercentage);
-    }
-    
-    public static Map<Long, Long> 
getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> 
counterMap, final int samplingPercentage) throws IOException {
-        Preconditions.checkArgument(samplingPercentage > 0);
-        return Maps.transformValues(counterMap, new 
Function<HyperLogLogPlusCounter, Long>() {
-            @Nullable
-            @Override
-            public Long apply(HyperLogLogPlusCounter input) {
-                return input.getCountEstimate() * 100 / samplingPercentage;
-            }
-        });
-    }
-
-    // return map of Cuboid ID => MB
-    public static Map<Long, Double> 
getCubeSizeMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig 
kylinConfig, Configuration conf) throws IOException {
-        Map<Long, Long> rowCountMap = 
getCubeRowCountMapFromCuboidStatistics(cubeSegment, kylinConfig, conf);
-        Map<Long, Double> sizeMap = getCubeSizeMapFromRowCount(cubeSegment, 
rowCountMap);
-        return sizeMap;
-    }
-
-    public static Map<Long, Double> getCubeSizeMapFromRowCount(CubeSegment 
cubeSegment, Map<Long, Long> rowCountMap) {
-        final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-        final List<Integer> rowkeyColumnSize = Lists.newArrayList();
-        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        final List<TblColRef> columnList = baseCuboid.getColumns();
-
-        for (int i = 0; i < columnList.size(); i++) {
-            
rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i)));
-        }
-
-        Map<Long, Double> sizeMap = Maps.newHashMap();
-        for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) {
-            sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, 
entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize));
-        }
-        return sizeMap;
-    }
-
-    /**
-     * Estimate the cuboid's size
-     *
-     * @return the cuboid size in M bytes
-     */
-    private static double estimateCuboidStorageSize(CubeSegment cubeSegment, 
long cuboidId, long rowCount, long baseCuboidId, List<Integer> 
rowKeyColumnLength) {
-
-        int bytesLength = cubeSegment.getRowKeyPreambleSize();
-
-        long mask = Long.highestOneBit(baseCuboidId);
-        long parentCuboidIdActualLength = Long.SIZE - 
Long.numberOfLeadingZeros(baseCuboidId);
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & cuboidId) > 0) {
-                bytesLength += rowKeyColumnLength.get(i); 
//colIO.getColumnLength(columnList.get(i));
-            }
-            mask = mask >> 1;
-        }
-
-        // add the measure length
-        int space = 0;
-        boolean isMemoryHungry = false;
-        for (MeasureDesc measureDesc : 
cubeSegment.getCubeDesc().getMeasures()) {
-            if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) {
-                isMemoryHungry = true;
-            }
-            DataType returnType = 
measureDesc.getFunction().getReturnDataType();
-            space += returnType.getStorageBytesEstimate();
-        }
-        bytesLength += space;
-
-        double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L);
-        if (isMemoryHungry) {
-            logger.info("Cube is memory hungry, storage size estimation 
multiply 0.05");
-            ret *= 0.05;
-        } else {
-            logger.info("Cube is not memory hungry, storage size estimation 
multiply 0.25");
-            ret *= 0.25;
-        }
-        logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each 
row size is " + bytesLength + " bytes." + " Total size is " + ret + "M.");
-        return ret;
-    }
-
     public static void main(String[] args) throws Exception {
         InMemCuboidJob job = new InMemCuboidJob();
         int exitCode = ToolRunner.run(job, args);

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 59a19d3..8737173 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -56,6 +56,7 @@ import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.*;
 import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.*;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
 import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
@@ -454,8 +455,8 @@ public class SparkCubing extends AbstractApplication {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final Map<Long, Long> rowCountMap = 
InMemCuboidJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100);
-        final Map<Long, Double> cubeSizeMap = 
InMemCuboidJob.getCubeSizeMapFromRowCount(cubeSegment, rowCountMap);
+        final Map<Long, Long> rowCountMap = 
CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100);
+        final Map<Long, Double> cubeSizeMap = 
CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap);
         System.out.println("cube size estimation:" + cubeSizeMap);
         final byte[][] splitKeys = 
CreateHTableJob.getSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, 
cubeSegment);
         CubeHTableUtil.createHTable(cubeSegment, splitKeys);

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
index a6b8a9f..9609442 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
@@ -92,7 +92,7 @@ public class StreamingMonitor {
     private static List<CubeSegment> getSortedReadySegments(String cubeName) {
         final CubeInstance cube = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
         Preconditions.checkNotNull(cube);
-        final List<CubeSegment> segments = 
cube.getSegment(SegmentStatusEnum.READY);
+        final List<CubeSegment> segments = 
cube.getSegments(SegmentStatusEnum.READY);
         logger.info("totally " + segments.size() + " cubeSegments");
         Collections.sort(segments);
         return segments;

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 4f8e06f..b8e4cda 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -588,7 +588,7 @@ public class CubeService extends BasicService {
         if (desc.getRetentionRange() > 0) {
             synchronized (CubeService.class) {
                 cube = getCubeManager().getCube(cubeName);
-                List<CubeSegment> readySegs = 
cube.getSegment(SegmentStatusEnum.READY);
+                List<CubeSegment> readySegs = 
cube.getSegments(SegmentStatusEnum.READY);
                 long currentRange = 0;
                 int position = readySegs.size() - 1;
                 while (position >= 0) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index a19e4f1..155593a 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -206,7 +206,7 @@ public class JobService extends BasicService {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, 
startDate, endDate, forceMergeEmptySeg);
             job = EngineFactory.createBatchMergeJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.REFRESH) {
-            List<CubeSegment> readySegs = 
cube.getSegment(SegmentStatusEnum.READY);
+            List<CubeSegment> readySegs = 
cube.getSegments(SegmentStatusEnum.READY);
             boolean segExists = false;
             for (CubeSegment aSeg : readySegs) {
                 if (aSeg.getDateRangeStart() == startDate && 
aSeg.getDateRangeEnd() == endDate) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/db95d72c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 85c9200..023c69d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -46,6 +46,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.CuboidShardUtil;
 import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -94,7 +95,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         try {
             byte[][] splitKeys;
             if (statsEnabled) {
-                final Map<Long, Double> cuboidSizeMap = 
InMemCuboidJob.getCubeSizeMapFromCuboidStatistics(cubeSegment, kylinConfig, 
conf);
+                final Map<Long, Double> cuboidSizeMap = new 
CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
                 splitKeys = getSplitsFromCuboidStatistics(cuboidSizeMap, 
kylinConfig, cubeSegment);
             } else {
                 splitKeys = getSplits(conf, partitionFilePath);

Reply via email to