fix NPE in CacheDictionary in Spark cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/66bca9a6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/66bca9a6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/66bca9a6 Branch: refs/heads/master-hbase1.x Commit: 66bca9a676fda02aa88cc25bad2e545f779f6dde Parents: 2cf52b4 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Jan 7 10:17:33 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Jan 9 16:58:11 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/dict/CacheDictionary.java | 2 +- .../apache/kylin/engine/mr/BatchCubingJobBuilder.java | 5 ++--- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 7 +++---- .../org/apache/kylin/engine/mr/JobBuilderSupport.java | 14 +------------- .../engine/spark/SparkBatchCubingJobBuilder2.java | 1 + .../apache/kylin/engine/spark/SparkCubingByLayer.java | 2 +- .../kylin/engine/spark/SparkCubingJobBuilder.java | 1 - 7 files changed, 9 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java index d7ed6bd..b2bad53 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CacheDictionary.java @@ -35,7 +35,7 @@ public abstract class CacheDictionary<T> extends Dictionary<T> { protected transient int baseId; - protected transient BytesConverter<T> bytesConvert; + protected BytesConverter<T> bytesConvert; public CacheDictionary() { http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index 456f615..36c12a1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -65,12 +65,11 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { // Phase 3: Build Cube RowKeyDesc rowKeyDesc = seg.getCubeDesc().getRowkey(); final int groupRowkeyColumnsCount = seg.getCubeDesc().getBuildLevel(); - final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, groupRowkeyColumnsCount); // base cuboid step - result.addTask(createBaseCuboidStep(cuboidOutputTempPath[0], jobId)); + result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps for (int i = 1; i <= groupRowkeyColumnsCount; i++) { - result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath[i - 1], cuboidOutputTempPath[i], i)); + result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i)); } outputSide.addStepPhase3_BuildCube(result, cuboidRootPath); http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 700f821..dd866bd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -76,14 +76,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return result; } - private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { + protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { final int maxLevel = seg.getCubeDesc().getBuildLevel(); - final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, maxLevel); // base cuboid step - result.addTask(createBaseCuboidStep(cuboidOutputTempPath[0], jobId)); + result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps for (int i = 1; i <= maxLevel; i++) { - result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath[i - 1], cuboidOutputTempPath[i], i, jobId)); + result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i-1), i, jobId)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 14252ee..696b22a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -193,23 +193,11 @@ public class JobBuilderSupport { return buf.append(" -").append(paraName).append(" ").append(paraValue); } - public String[] getCuboidOutputPaths(String cuboidRootPath, int levels) { - String[] paths = new String[levels + 1]; - for (int i = 0; i <= levels; i++) { - if (i == 0) { - paths[i] = cuboidRootPath + "base_cuboid"; - } else { - paths[i] = cuboidRootPath + "level_" + i + "_cuboid"; - } - } - return paths; - } - public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) { if (level == 0) { return cuboidRootPath + "base_cuboid"; } else { - return cuboidRootPath + level + "level_cuboid"; + return cuboidRootPath + "level_" + level + "_cuboid"; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 9431468..55e11c4 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -41,6 +41,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { super(newSegment, submitter); } + @Override protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { } http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 53c1f96..93cce81 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -260,7 +260,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa // aggregate to ND cuboids PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); - for (level = 1; level < totalLevels; level++) { + for (level = 1; level <= totalLevels; level++) { partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); logger.info("Level " + level + " partition number: " + partition); allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition).persist(storageLevel); http://git-wip-us.apache.org/repos/asf/kylin/blob/66bca9a6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java index edd9460..76e4521 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java @@ -50,7 +50,6 @@ public class SparkCubingJobBuilder extends JobBuilderSupport { public DefaultChainedExecutable build() { final CubingJob result = CubingJob.createBuildJob(seg, submitter, config); - final String jobId = result.getId(); inputSide.addStepPhase1_CreateFlatTable(result); final IJoinedFlatTableDesc joinedFlatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);