Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2814 abd13082b -> 3f80488e3 (forced update)


KYLIN-2811, refine spark cubing

Signed-off-by: Hongbin Ma <mahong...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a1c234a9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a1c234a9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a1c234a9

Branch: refs/heads/KYLIN-2814
Commit: a1c234a9afbc5a30306f4275127649f980ab75bd
Parents: 4316cfd
Author: Cheng Wang <cheng.w...@kyligence.io>
Authored: Sun Aug 27 13:29:12 2017 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Sun Aug 27 15:08:18 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  50 +--
 .../metadata/model/ComputedColumnDesc.java      |  10 +-
 .../kylin/engine/mr/common/BatchConstants.java  |   1 +
 .../spark/SparkBatchCubingJobBuilder2.java      |   1 +
 .../kylin/engine/spark/SparkCubingByLayer.java  | 380 ++++++++++++-------
 .../localmeta/cube_desc/ci_inner_join_cube.json |   2 +-
 .../kylin/rest/util/Log4jConfigListener.java    |   1 +
 7 files changed, 267 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index a56e9b8..1d5e0ec 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -18,6 +18,15 @@
 
 package org.apache.kylin.common;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.OrderedProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -34,26 +43,15 @@ import java.util.Enumeration;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.restclient.RestClient;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.OrderedProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
 /**
  */
 public class KylinConfig extends KylinConfigBase {
     private static final long serialVersionUID = 1L;
     private static final Logger logger = 
LoggerFactory.getLogger(KylinConfig.class);
 
-    /** Kylin properties file name */
+    /**
+     * Kylin properties file name
+     */
     public static final String KYLIN_CONF_PROPERTIES_FILE = "kylin.properties";
     public static final String KYLIN_CONF = "KYLIN_CONF";
 
@@ -62,7 +60,7 @@ public class KylinConfig extends KylinConfigBase {
 
     // thread-local instances, will override SYS_ENV_INSTANCE
     private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = 
new ThreadLocal<>();
-    
+
     static {
         /*
          * Make Calcite to work with Unicode.
@@ -121,9 +119,6 @@ public class KylinConfig extends KylinConfigBase {
     }
 
     private static UriType decideUriType(String metaUri) {
-        if (metaUri.indexOf("@hdfs") > 0) {
-            return UriType.HDFS_FILE;
-        }
 
         try {
             File file = new File(metaUri);
@@ -163,23 +158,6 @@ public class KylinConfig extends KylinConfigBase {
          */
         UriType uriType = decideUriType(uri);
 
-        if (uriType == UriType.HDFS_FILE) {
-            KylinConfig config;
-            FileSystem fs;
-            int cut = uri.indexOf('@');
-            String realHdfsPath = uri.substring(0, cut) + "/" + 
KYLIN_CONF_PROPERTIES_FILE;
-            try {
-                config = new KylinConfig();
-                fs = HadoopUtil.getFileSystem(realHdfsPath);
-                InputStream is = fs.open(new Path(realHdfsPath));
-                Properties prop = streamToProps(is);
-                config.reloadKylinConfig(prop);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            return config;
-        }
-
         if (uriType == UriType.LOCAL_FOLDER) {
             KylinConfig config = new KylinConfig();
             config.setMetadataUrl(uri);
@@ -241,7 +219,7 @@ public class KylinConfig extends KylinConfigBase {
     public static void setKylinConfigThreadLocal(KylinConfig config) {
         THREAD_ENV_INSTANCE.set(config);
     }
-    
+
     public static void removeKylinConfigThreadLocal() {
         THREAD_ENV_INSTANCE.remove();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
index e8cc351..2ee2b38 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
@@ -17,12 +17,13 @@
 */
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
+
+import org.apache.kylin.metadata.model.tool.CalciteParser;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
-
-import java.io.Serializable;
 
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
 public class ComputedColumnDesc implements Serializable {
@@ -46,7 +47,8 @@ public class ComputedColumnDesc implements Serializable {
         tableIdentity = tableIdentity.toUpperCase();
         columnName = columnName.toUpperCase();
 
-        CalciteParser.ensureNoTableNameExists(expression);
+        if ("true".equals(System.getProperty("needCheckCC")))
+            CalciteParser.ensureNoTableNameExists(expression);
     }
 
     public String getFullName() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 0cb23ac..1ca7024 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -63,6 +63,7 @@ public interface BatchConstants {
     String CFG_OUTPUT_DICT = "dict";
     String CFG_OUTPUT_STATISTICS = "statistics";
     String CFG_OUTPUT_PARTITION = "partition";
+    String CFG_MR_SPARK_JOB = "mr.spark.job";
 
     /**
      * command line ARGuments

http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 07bc334..f1e6aea 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -49,6 +49,7 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), 
seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + 
flatTableDesc.getTableName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), 
getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), 
KylinConfig.getKylinConfPath());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), 
cuboidRootPath);
 
         StringBuilder jars = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index a8e7378..a3a6ad0 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,18 +17,28 @@
 */
 package org.apache.kylin.engine.spark;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -41,6 +51,8 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
@@ -50,27 +62,22 @@ import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
 import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.util.SizeEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import scala.Tuple2;
 
 /**
  * Spark application to build cube with the "by-layer" algorithm. Only support 
source data from Hive; Metadata in HBase.
@@ -79,13 +86,21 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
 
     protected static final Logger logger = 
LoggerFactory.getLogger(SparkCubingByLayer.class);
 
-    public static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube
 Name").create(BatchConstants.ARG_CUBE_NAME);
-    public static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube
 Segment Id").create("segmentId");
-    public static final Option OPTION_META_URL = 
OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true).withDescription("HDFS
 metadata url").create("metaUrl");
-    public static final Option OPTION_OUTPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube
 output path").create(BatchConstants.ARG_OUTPUT);
-    public static final Option OPTION_INPUT_TABLE = 
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive
 Intermediate Table").create("hiveTable");
+    public static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube 
Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = 
OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Cube output 
path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_TABLE = 
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_CONF_PATH = 
OptionBuilder.withArgName("confPath").hasArg().isRequired(true)
+            .withDescription("Configuration Path").create("confPath");
 
     private Options options;
+    private static String metaUrl;
 
     public SparkCubingByLayer() {
         options = new Options();
@@ -94,6 +109,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_CONF_PATH);
     }
 
     @Override
@@ -101,41 +117,44 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         return options;
     }
 
-    public static KylinConfig loadKylinConfig(String metaUrl) throws 
IOException {
-        KylinConfig kylinConfig = KylinConfig.createInstanceFromUri(metaUrl);
-        KylinConfig.setKylinConfigThreadLocal(kylinConfig);
-        return kylinConfig;
+    public static KylinConfig getKylinConfigForExecutor() {
+        File file = new 
File(SparkFiles.get(KylinConfig.KYLIN_CONF_PROPERTIES_FILE));
+        String confPath = file.getParentFile().getAbsolutePath();
+        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setMetadataUrl(metaUrl);
+        return config;
     }
 
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
-        final String hiveTable = 
optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
-        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
-        final String segmentId = 
optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
-        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
-        final String outputPath = 
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+        Class[] kryoClassArray = new Class[] { org.apache.hadoop.io.Text.class,
+                Class.forName("scala.reflect.ClassTag$$anon$1"), 
java.lang.Class.class };
 
         SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + 
" segment " + segmentId);
         //serialization conf
         conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
-        conf.set("spark.kryo.registrationRequired", "true");
+        conf.set("spark.kryo.registrationRequired", 
"true").registerKryoClasses(kryoClassArray);
+
         JavaSparkContext sc = new JavaSparkContext(conf);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        final KylinConfig kylinConfig = loadKylinConfig(metaUrl);
 
-        HiveContext sqlContext = new HiveContext(sc.sc());
-        final DataFrame intermediateTable = sqlContext.table(hiveTable);
-        final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        sc.addFile(confPath + File.separator + 
KylinConfig.KYLIN_CONF_PROPERTIES_FILE);
+        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
+        KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
+
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final CubeJoinedFlatTableEnrich intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
 
-        final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc);
-        final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment);
-        final NDCuboidBuilder ndCuboidBuilder = new 
NDCuboidBuilder(vCubeSegment.getValue(), new 
RowKeyEncoderProvider(vCubeSegment.getValue()));
-        final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new 
CuboidScheduler(vCubeDesc.getValue()));
-        final int measureNum = cubeDesc.getMeasures().size();
         int countMeasureIndex = 0;
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
             if (measureDesc.getFunction().isCount() == true) {
@@ -144,7 +163,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
                 countMeasureIndex++;
             }
         }
-        final CubeStatsReader cubeStatsReader = new 
CubeStatsReader(cubeSegment, kylinConfig);
+        final CubeStatsReader cubeStatsReader = new 
CubeStatsReader(cubeSegment, envConfig);
         boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
         boolean allNormalMeasure = true;
         for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
@@ -153,141 +172,212 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         }
         logger.info("All measure are normal (agg on all cuboids) ? : " + 
allNormalMeasure);
         StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
-        // encode with dimension encoding, transform to <ByteArray, Object[]> 
RDD
-        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, 
Object[]>() {
-            volatile transient boolean initialized = false;
-            BaseCuboidBuilder baseCuboidBuilder = null;
-
-            @Override
-            public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
-                if (initialized == false) {
-                    synchronized (SparkCubingByLayer.class) {
-                        if (initialized == false) {
-                            loadKylinConfig(metaUrl);
-                            long baseCuboidId = 
Cuboid.getBaseCuboidId(cubeDesc);
-                            Cuboid baseCuboid = Cuboid.findById(cubeDesc, 
baseCuboidId);
-                            baseCuboidBuilder = new 
BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, 
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), 
MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
-                            initialized = true;
-                        }
-                    }
-                }
-
-                String[] rowArray = rowToArray(row);
-                baseCuboidBuilder.resetAggrs();
-                byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
-                Object[] result = 
baseCuboidBuilder.buildValueObjects(rowArray);
-                return new Tuple2<>(new ByteArray(rowKey), result);
-            }
 
-            private String[] rowToArray(Row row) {
-                String[] result = new String[row.size()];
-                for (int i = 0; i < row.size(); i++) {
-                    final Object o = row.get(i);
-                    if (o != null) {
-                        result[i] = o.toString();
-                    } else {
-                        result[i] = null;
-                    }
-                }
-                return result;
-            }
+        HiveContext sqlContext = new HiveContext(sc.sc());
+        final DataFrame intermediateTable = sqlContext.table(hiveTable);
 
-        });
+        // encode with dimension encoding, transform to <ByteArray, Object[]> 
RDD
+        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD()
+                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId));
 
-        logger.info("encodedBaseRDD partition number: " + 
encodedBaseRDD.getNumPartitions());
         Long totalCount = 0L;
-        if (kylinConfig.isSparkSanityCheckEnabled()) {
+        if (envConfig.isSparkSanityCheckEnabled()) {
             totalCount = encodedBaseRDD.count();
-            logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count());
         }
-        final MeasureAggregators measureAggregators = new 
MeasureAggregators(cubeDesc.getMeasures());
-        final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), 
measureAggregators);
+
+        final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(cubeName);
         BaseCuboidReducerFunction2 reducerFunction2 = 
baseCuboidReducerFunction;
         if (allNormalMeasure == false) {
-            reducerFunction2 = new CuboidReducerFunction2(measureNum, 
vCubeDesc.getValue(), measureAggregators, needAggr);
+            reducerFunction2 = new CuboidReducerFunction2(cubeName, needAggr);
         }
+
         final int totalLevels = cubeDesc.getBuildLevel();
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new 
JavaPairRDD[totalLevels + 1];
         int level = 0;
-        int partition = estimateRDDPartitionNum(level, cubeStatsReader, 
kylinConfig);
+        long baseRDDSize = SizeEstimator.estimate(encodedBaseRDD) / (1024 * 
1024);
+        int partition = estimateRDDPartitionNum(level, cubeStatsReader, 
envConfig, (int) baseRDDSize);
+
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, 
partition).persist(storageLevel);
         Configuration confOverwrite = new 
Configuration(sc.hadoopConfiguration());
         confOverwrite.set("dfs.replication", "2"); // cuboid intermediate 
files, replication=2
-        saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, 
confOverwrite);
+
+        saveToHDFS(allRDDs[0], cubeName, cubeSegment, outputPath, 0, 
confOverwrite);
+
         // aggregate to ND cuboids
-        PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> 
flatMapFunction = new CuboidFlatMap(metaUrl, vCubeSegment.getValue(), 
vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
         for (level = 1; level <= totalLevels; level++) {
-            partition = estimateRDDPartitionNum(level, cubeStatsReader, 
kylinConfig);
-            logger.info("Level " + level + " partition number: " + partition);
-            allRDDs[level] = allRDDs[level - 
1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, 
partition).persist(storageLevel);
-            if (kylinConfig.isSparkSanityCheckEnabled() == true) {
+            long levelRddSize = SizeEstimator.estimate(allRDDs[level - 1]) / 
(1024 * 1024);
+            partition = estimateRDDPartitionNum(level, cubeStatsReader, 
envConfig, (int) levelRddSize);
+            allRDDs[level] = allRDDs[level - 1].flatMapToPair(new 
CuboidFlatMap(cubeName, segmentId))
+                    .reduceByKey(reducerFunction2, 
partition).persist(storageLevel);
+            if (envConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
             }
-            saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, 
level, confOverwrite);
+            saveToHDFS(allRDDs[level], cubeName, cubeSegment, outputPath, 
level, confOverwrite);
             allRDDs[level - 1].unpersist();
         }
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
-
         deleteHDFSMeta(metaUrl);
     }
 
-    private static int estimateRDDPartitionNum(int level, CubeStatsReader 
statsReader, KylinConfig kylinConfig) {
-        double baseCuboidSize = statsReader.estimateLayerSize(level);
+    private static int estimateRDDPartitionNum(int level, CubeStatsReader 
statsReader, KylinConfig kylinConfig,
+            int rddSize) {
+        int baseCuboidSize = (int) Math.min(rddSize, 
statsReader.estimateLayerSize(level));
         float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
         int partition = (int) (baseCuboidSize / rddCut);
         partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
         partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
-        logger.debug("Estimated level " + level + " partition number: " + 
partition);
         return partition;
     }
 
-    private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, 
final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, 
Configuration conf) {
+    private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final 
String cubeName,
+            final CubeSegment cubeSeg, final String hdfsBaseLocation, int 
level, Configuration conf) throws Exception {
         final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
-        rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, 
org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
-            BufferedMeasureCodec codec = new 
BufferedMeasureCodec(cubeDesc.getMeasures());
 
-            @Override
-            public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws 
Exception {
-                ByteBuffer valueBuf = codec.encode(tuple2._2());
-                byte[] encodedBytes = new byte[valueBuf.position()];
-                System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, 
valueBuf.position());
-                return new Tuple2<>(new 
org.apache.hadoop.io.Text(tuple2._1().array()), new 
org.apache.hadoop.io.Text(encodedBytes));
-            }
-        }).saveAsNewAPIHadoopFile(cuboidOutputPath, 
org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, 
SequenceFileOutputFormat.class, conf);
+        Job job = Job.getInstance(conf);
+        IMROutput2.IMROutputFormat outputFormat = 
MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
+        outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+        job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, 
cubeSeg.getCubeInstance().getName());
+        job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
cubeSeg.getUuid());
+        job.getConfiguration().set(BatchConstants.CFG_MR_SPARK_JOB, "spark");
+
+        rdd.mapToPair(
+                new PairFunction<Tuple2<ByteArray, Object[]>, 
org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+                    private volatile transient boolean initialized = false;
+                    BufferedMeasureCodec codec;
+
+                    @Override
+                    public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(
+                            Tuple2<ByteArray, Object[]> tuple2) throws 
Exception {
+                        if (!initialized) {
+                            synchronized (SparkCubingByLayer.class) {
+                                if (!initialized) {
+                                    KylinConfig kylinConfig = 
getKylinConfigForExecutor();
+                                    CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                    codec = new 
BufferedMeasureCodec(desc.getMeasures());
+                                    initialized = true;
+                                }
+                            }
+                        }
+                        ByteBuffer valueBuf = codec.encode(tuple2._2());
+                        byte[] encodedBytes = new byte[valueBuf.position()];
+                        System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, 
valueBuf.position());
+                        return new Tuple2<>(new 
org.apache.hadoop.io.Text(tuple2._1().array()),
+                                new org.apache.hadoop.io.Text(encodedBytes));
+                    }
+                
}).sortByKey().saveAsNewAPIHadoopDataset(job.getConfiguration());
         logger.info("Persisting RDD for level " + level + " into " + 
cuboidOutputPath);
     }
 
-    class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], 
Object[]> {
-        CubeDesc cubeDesc;
-        int measureNum;
-        MeasureAggregators aggregators;
+    static class EncodeBaseCuboid implements PairFunction<Row, ByteArray, 
Object[]> {
+        private volatile transient boolean initialized = false;
+        private BaseCuboidBuilder baseCuboidBuilder = null;
+        private String cubeName;
+        private String segmentId;
+
+        public EncodeBaseCuboid(String cubeName, String segmentId) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+        }
+
+        @Override
+        public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkCubingByLayer.class) {
+                    if (initialized == false) {
+                        KylinConfig kConfig = getKylinConfigForExecutor();
+                        CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                        CubeDesc cubeDesc = cubeInstance.getDescriptor();
+                        CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
+                        CubeJoinedFlatTableEnrich interDesc = new 
CubeJoinedFlatTableEnrich(
+                                
EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+                        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+                        Cuboid baseCuboid = Cuboid.findById(cubeDesc, 
baseCuboidId);
+                        baseCuboidBuilder = new BaseCuboidBuilder(kConfig, 
cubeDesc, cubeSegment, interDesc,
+                                
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+                                
MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
+                        initialized = true;
+                    }
+                }
+            }
+            String[] rowArray = rowToArray(row);
+            baseCuboidBuilder.resetAggrs();
+            byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+            Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
+            return new Tuple2<>(new ByteArray(rowKey), result);
+        }
+
+        private String[] rowToArray(Row row) {
+            String[] result = new String[row.size()];
+            for (int i = 0; i < row.size(); i++) {
+                final Object o = row.get(i);
+                if (o != null) {
+                    result[i] = o.toString();
+                } else {
+                    result[i] = null;
+                }
+            }
+            return result;
+        }
+    }
+
+    static class BaseCuboidReducerFunction2 implements Function2<Object[], 
Object[], Object[]> {
+        protected String cubeName;
+        protected CubeDesc cubeDesc;
+        protected int measureNum;
+        protected MeasureAggregators aggregators;
+        protected volatile transient boolean initialized = false;
 
-        BaseCuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, 
MeasureAggregators aggregators) {
-            this.cubeDesc = cubeDesc;
-            this.measureNum = measureNum;
-            this.aggregators = aggregators;
+        public BaseCuboidReducerFunction2(String cubeName) {
+            this.cubeName = cubeName;
+        }
+
+        public void init() {
+            KylinConfig kConfig = getKylinConfigForExecutor();
+            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+            cubeDesc = cubeInstance.getDescriptor();
+            aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+            measureNum = cubeDesc.getMeasures().size();
         }
 
         @Override
         public Object[] call(Object[] input1, Object[] input2) throws 
Exception {
+            if (initialized == false) {
+                synchronized (SparkCubingByLayer.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
+            }
             Object[] result = new Object[measureNum];
             aggregators.aggregate(input1, input2, result);
             return result;
         }
     }
 
-    class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
-        boolean[] needAggr;
+    static class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
+        private boolean[] needAggr;
 
-        CuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, 
MeasureAggregators aggregators, boolean[] needAggr) {
-            super(measureNum, cubeDesc, aggregators);
+        public CuboidReducerFunction2(String cubeName, boolean[] needAggr) {
+            super(cubeName);
             this.needAggr = needAggr;
         }
 
         @Override
         public Object[] call(Object[] input1, Object[] input2) throws 
Exception {
+            if (initialized == false) {
+                synchronized (SparkCubingByLayer.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
+            }
             Object[] result = new Object[measureNum];
             aggregators.aggregate(input1, input2, result, needAggr);
             return result;
@@ -296,30 +386,41 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
 
     private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> 
EMTPY_ITERATOR = new ArrayList(0);
 
-    class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, 
Object[]>, ByteArray, Object[]> {
-
-        String metaUrl;
-        CubeSegment cubeSegment;
-        CubeDesc cubeDesc;
-        CuboidScheduler cuboidScheduler;
-        NDCuboidBuilder ndCuboidBuilder;
-        RowKeySplitter rowKeySplitter;
-        transient boolean initialized = false;
-
-        CuboidFlatMap(String metaUrl, CubeSegment cubeSegment, CubeDesc 
cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
-            this.metaUrl = metaUrl;
-            this.cubeSegment = cubeSegment;
-            this.cubeDesc = cubeDesc;
-            this.cuboidScheduler = cuboidScheduler;
-            this.ndCuboidBuilder = ndCuboidBuilder;
+    static class CuboidFlatMap implements 
PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
+
+        private String cubeName;
+        private String segmentId;
+        private CubeSegment cubeSegment;
+        private CubeDesc cubeDesc;
+        private CuboidScheduler cuboidScheduler;
+        private NDCuboidBuilder ndCuboidBuilder;
+        private RowKeySplitter rowKeySplitter;
+        private volatile transient boolean initialized = false;
+
+        public CuboidFlatMap(String cubeName, String segmentId) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+        }
+
+        public void init() {
+            KylinConfig kConfig = getKylinConfigForExecutor();
+            CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+            this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+            this.cubeDesc = cubeInstance.getDescriptor();
+            this.cuboidScheduler = new CuboidScheduler(cubeDesc);
+            this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new 
RowKeyEncoderProvider(cubeSegment));
             this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
         }
 
         @Override
         public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, 
Object[]> tuple2) throws Exception {
             if (initialized == false) {
-                loadKylinConfig(metaUrl);
-                initialized = true;
+                synchronized (SparkCubingByLayer.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
             }
 
             byte[] key = tuple2._1().array();
@@ -336,7 +437,8 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             List<Tuple2<ByteArray, Object[]>> tuples = new 
ArrayList(myChildren.size());
             for (Long child : myChildren) {
                 Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
-                Pair<Integer, ByteArray> result = 
ndCuboidBuilder.buildKey(parentCuboid, childCuboid, 
rowKeySplitter.getSplitBuffers());
+                Pair<Integer, ByteArray> result = 
ndCuboidBuilder.buildKey(parentCuboid, childCuboid,
+                        rowKeySplitter.getSplitBuffers());
 
                 byte[] newKey = new byte[result.getFirst()];
                 System.arraycopy(result.getSecond().array(), 0, newKey, 0, 
result.getFirst());
@@ -349,11 +451,14 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
     }
 
     //sanity check
-    private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long 
totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int 
countMeasureIndex) {
+    private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long 
totalCount, int thisLevel,
+            CubeStatsReader cubeStatsReader, final int countMeasureIndex) {
         int thisCuboidNum = 
cubeStatsReader.getCuboidsByLayer(thisLevel).size();
         Long count2 = getRDDCountSum(rdd, countMeasureIndex);
         if (count2 != totalCount * thisCuboidNum) {
-            throw new IllegalStateException(String.format("Sanity check 
failed, level %s, total count(*) is %s; cuboid number %s", thisLevel, count2, 
thisCuboidNum));
+            throw new IllegalStateException(
+                    String.format("Sanity check failed, level %s, total 
count(*) is %s; cuboid number %s", thisLevel,
+                            count2, thisCuboidNum));
         } else {
             logger.info("sanity check success for level " + thisLevel + ", 
count(*) is " + (count2 / thisCuboidNum));
         }
@@ -368,7 +473,8 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             }
         }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, 
Long>, Tuple2<ByteArray, Long>>() {
             @Override
-            public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> 
longTuple2, Tuple2<ByteArray, Long> longTuple22) throws Exception {
+            public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> 
longTuple2, Tuple2<ByteArray, Long> longTuple22)
+                    throws Exception {
                 return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2());
             }
         })._2();
@@ -380,5 +486,5 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         String path = metaUrl.substring(0, cut);
         HadoopUtil.getFileSystem(path).delete(new Path(path), true);
         logger.info("Delete metadata in HDFS for this job: " + path);
-    };
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
----------------------------------------------------------------------
diff --git 
a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json 
b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
index 28a63d5..27acdd3 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
@@ -610,7 +610,7 @@
   "status_need_notify": [],
   "auto_merge_time_ranges": null,
   "retention_range": 0,
-  "engine_type": 2,
+  "engine_type": 4,
   "storage_type": 2,
   "override_kylin_properties": {
     "kylin.cube.algorithm": "LAYER"

http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java 
b/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java
index 7e79511..3dde9cf 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java
@@ -35,6 +35,7 @@ public class Log4jConfigListener extends 
org.springframework.web.util.Log4jConfi
         if (!isDebugTomcat) {
             super.contextInitialized(event);
         }
+        System.setProperty("needCheckCC", "true");
     }
 
     @Override

Reply via email to