KYLIN-2811, fix NPE

Signed-off-by: Hongbin Ma <mahong...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/727920b4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/727920b4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/727920b4

Branch: refs/heads/KYLIN-2814
Commit: 727920b4d3642aaa3657d90b7f3dce7dcdd68fe2
Parents: a1c234a
Author: Cheng Wang <cheng.w...@kyligence.io>
Authored: Mon Aug 28 12:47:58 2017 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Mon Aug 28 14:41:26 2017 +0800

----------------------------------------------------------------------
 .../kylin/engine/spark/SparkCubingByLayer.java  | 43 +++++++++++---------
 1 file changed, 24 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/727920b4/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index a3a6ad0..a03e238 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -100,7 +100,6 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             .withDescription("Configuration Path").create("confPath");
 
     private Options options;
-    private static String metaUrl;
 
     public SparkCubingByLayer() {
         options = new Options();
@@ -117,7 +116,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         return options;
     }
 
-    public static KylinConfig getKylinConfigForExecutor() {
+    public static KylinConfig getKylinConfigForExecutor(String metaUrl) {
         File file = new 
File(SparkFiles.get(KylinConfig.KYLIN_CONF_PROPERTIES_FILE));
         String confPath = file.getParentFile().getAbsolutePath();
         System.setProperty(KylinConfig.KYLIN_CONF, confPath);
@@ -128,7 +127,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
 
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
-        metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
         String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
@@ -178,17 +177,17 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
 
         // encode with dimension encoding, transform to <ByteArray, Object[]> 
RDD
         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD()
-                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId));
+                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl));
 
         Long totalCount = 0L;
         if (envConfig.isSparkSanityCheckEnabled()) {
             totalCount = encodedBaseRDD.count();
         }
 
-        final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(cubeName);
+        final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(cubeName, metaUrl);
         BaseCuboidReducerFunction2 reducerFunction2 = 
baseCuboidReducerFunction;
         if (allNormalMeasure == false) {
-            reducerFunction2 = new CuboidReducerFunction2(cubeName, needAggr);
+            reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, 
needAggr);
         }
 
         final int totalLevels = cubeDesc.getBuildLevel();
@@ -202,18 +201,18 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         Configuration confOverwrite = new 
Configuration(sc.hadoopConfiguration());
         confOverwrite.set("dfs.replication", "2"); // cuboid intermediate 
files, replication=2
 
-        saveToHDFS(allRDDs[0], cubeName, cubeSegment, outputPath, 0, 
confOverwrite);
+        saveToHDFS(allRDDs[0], cubeName, metaUrl, cubeSegment, outputPath, 0, 
confOverwrite);
 
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
             long levelRddSize = SizeEstimator.estimate(allRDDs[level - 1]) / 
(1024 * 1024);
             partition = estimateRDDPartitionNum(level, cubeStatsReader, 
envConfig, (int) levelRddSize);
-            allRDDs[level] = allRDDs[level - 1].flatMapToPair(new 
CuboidFlatMap(cubeName, segmentId))
+            allRDDs[level] = allRDDs[level - 1].flatMapToPair(new 
CuboidFlatMap(cubeName, segmentId, metaUrl))
                     .reduceByKey(reducerFunction2, 
partition).persist(storageLevel);
             if (envConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
             }
-            saveToHDFS(allRDDs[level], cubeName, cubeSegment, outputPath, 
level, confOverwrite);
+            saveToHDFS(allRDDs[level], cubeName, metaUrl, cubeSegment, 
outputPath, level, confOverwrite);
             allRDDs[level - 1].unpersist();
         }
         allRDDs[totalLevels].unpersist();
@@ -231,7 +230,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         return partition;
     }
 
-    private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final 
String cubeName,
+    private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final 
String cubeName, final String metaUrl,
             final CubeSegment cubeSeg, final String hdfsBaseLocation, int 
level, Configuration conf) throws Exception {
         final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
 
@@ -256,7 +255,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
                         if (!initialized) {
                             synchronized (SparkCubingByLayer.class) {
                                 if (!initialized) {
-                                    KylinConfig kylinConfig = 
getKylinConfigForExecutor();
+                                    KylinConfig kylinConfig = 
getKylinConfigForExecutor(metaUrl);
                                     CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
                                     codec = new 
BufferedMeasureCodec(desc.getMeasures());
                                     initialized = true;
@@ -278,10 +277,12 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         private BaseCuboidBuilder baseCuboidBuilder = null;
         private String cubeName;
         private String segmentId;
+        private String metaurl;
 
-        public EncodeBaseCuboid(String cubeName, String segmentId) {
+        public EncodeBaseCuboid(String cubeName, String segmentId, String 
metaurl) {
             this.cubeName = cubeName;
             this.segmentId = segmentId;
+            this.metaurl = metaurl;
         }
 
         @Override
@@ -289,7 +290,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             if (initialized == false) {
                 synchronized (SparkCubingByLayer.class) {
                     if (initialized == false) {
-                        KylinConfig kConfig = getKylinConfigForExecutor();
+                        KylinConfig kConfig = 
getKylinConfigForExecutor(metaurl);
                         CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
                         CubeDesc cubeDesc = cubeInstance.getDescriptor();
                         CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
@@ -327,17 +328,19 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
 
     static class BaseCuboidReducerFunction2 implements Function2<Object[], 
Object[], Object[]> {
         protected String cubeName;
+        protected String metaUrl;
         protected CubeDesc cubeDesc;
         protected int measureNum;
         protected MeasureAggregators aggregators;
         protected volatile transient boolean initialized = false;
 
-        public BaseCuboidReducerFunction2(String cubeName) {
+        public BaseCuboidReducerFunction2(String cubeName, String metaUrl) {
             this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
         }
 
         public void init() {
-            KylinConfig kConfig = getKylinConfigForExecutor();
+            KylinConfig kConfig = getKylinConfigForExecutor(metaUrl);
             CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
             cubeDesc = cubeInstance.getDescriptor();
             aggregators = new MeasureAggregators(cubeDesc.getMeasures());
@@ -363,8 +366,8 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
     static class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
         private boolean[] needAggr;
 
-        public CuboidReducerFunction2(String cubeName, boolean[] needAggr) {
-            super(cubeName);
+        public CuboidReducerFunction2(String cubeName, String metaUrl, 
boolean[] needAggr) {
+            super(cubeName, metaUrl);
             this.needAggr = needAggr;
         }
 
@@ -390,6 +393,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
 
         private String cubeName;
         private String segmentId;
+        private String metaUrl;
         private CubeSegment cubeSegment;
         private CubeDesc cubeDesc;
         private CuboidScheduler cuboidScheduler;
@@ -397,13 +401,14 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         private RowKeySplitter rowKeySplitter;
         private volatile transient boolean initialized = false;
 
-        public CuboidFlatMap(String cubeName, String segmentId) {
+        public CuboidFlatMap(String cubeName, String segmentId, String 
metaUrl) {
             this.cubeName = cubeName;
             this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
         }
 
         public void init() {
-            KylinConfig kConfig = getKylinConfigForExecutor();
+            KylinConfig kConfig = getKylinConfigForExecutor(metaUrl);
             CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
             this.cubeSegment = cubeInstance.getSegmentById(segmentId);
             this.cubeDesc = cubeInstance.getDescriptor();

Reply via email to