KYLIN-2188, refine SparkBatchCubingJobBuilder2
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/35b9d9ae Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/35b9d9ae Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/35b9d9ae Branch: refs/heads/master Commit: 35b9d9ae9692f2a0fcb4d56e7e47ecafa5f45663 Parents: 06138f5 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Thu Sep 14 10:39:57 2017 +0800 Committer: Roger Shi <rogershijich...@gmail.com> Committed: Fri Sep 15 14:35:56 2017 +0800 ---------------------------------------------------------------------- .../spark/SparkBatchCubingJobBuilder2.java | 43 +++++++++++--------- .../kylin/engine/spark/SparkCubingByLayer.java | 19 ++++++--- 2 files changed, 36 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/35b9d9ae/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 27bcff3..47ea3d0 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -42,9 +42,30 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { @Override protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { - IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); final SparkExecutable sparkExecutable = new SparkExecutable(); sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); + configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath); + result.addTask(sparkExecutable); + } + + @Override + protected void addInMemCubingSteps(final CubingJob result, String jobId, String cuboidRootPath) { + + } + + private static String findJar(String className, String perferLibraryName) { + try { + return ClassUtil.findContainingJar(Class.forName(className), perferLibraryName); + } catch (ClassNotFoundException e) { + logger.warn("failed to locate jar for class " + className + ", ignore it"); + } + + return ""; + } + + public static void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable, + final String jobId, final String cuboidRootPath) { + IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), @@ -64,28 +85,10 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars()); sparkExecutable.setJars(jars.toString()); - sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE); - result.addTask(sparkExecutable); - } - - @Override - protected void addInMemCubingSteps(final CubingJob result, String jobId, String cuboidRootPath) { - - } - - private String findJar(String className, String perferLibraryName) { - try { - return ClassUtil.findContainingJar(Class.forName(className), perferLibraryName); - } catch (ClassNotFoundException e) { - logger.warn("failed to locate jar for class " + className + ", ignore it"); - } - - return ""; } - private String getSegmentMetadataUrl(KylinConfig kylinConfig, String segmentID) { + private static String getSegmentMetadataUrl(KylinConfig kylinConfig, String segmentID) { return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID + "@hdfs"; } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/35b9d9ae/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 5d5b930..06cc988 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -143,7 +143,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final Job job = Job.getInstance(confOverwrite); logger.info("RDD Output path: {}", outputPath); - setHadoopConf(job); + setHadoopConf(job, cubeSegment, metaUrl); int countMeasureIndex = 0; for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { @@ -189,7 +189,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa // aggregate to calculate base cuboid allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel); - saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job); + saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job, envConfig); // aggregate to ND cuboids for (level = 1; level <= totalLevels; level++) { @@ -199,7 +199,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa if (envConfig.isSparkSanityCheckEnabled() == true) { sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); } - saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job); + saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig); allRDDs[level - 1].unpersist(); } allRDDs[totalLevels].unpersist(); @@ -207,7 +207,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa deleteHDFSMeta(metaUrl); } - protected void setHadoopConf(Job job) throws Exception { + protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) throws Exception { job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); } @@ -218,17 +218,24 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa int partition = (int) (baseCuboidSize / rddCut); partition = Math.max(kylinConfig.getSparkMinPartition(), partition); partition = Math.min(kylinConfig.getSparkMaxPartition(), partition); + logger.info("Partition for spark cubing: {}", partition); return partition; } + protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config, + CubeSegment segment, int level) { + return rdd; + } + protected void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String metaUrl, final String cubeName, - final CubeSegment cubeSeg, final String hdfsBaseLocation, int level, Job job) throws Exception { + final CubeSegment cubeSeg, final String hdfsBaseLocation, final int level, final Job job, + final KylinConfig kylinConfig) throws Exception { final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat(); outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level); - rdd.mapToPair( + prepareOutput(rdd, kylinConfig, cubeSeg, level).mapToPair( new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { private volatile transient boolean initialized = false; BufferedMeasureCodec codec;