KYLIN-2188, refine SparkBatchCubingJobBuilder2

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/35b9d9ae
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/35b9d9ae
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/35b9d9ae

Branch: refs/heads/master
Commit: 35b9d9ae9692f2a0fcb4d56e7e47ecafa5f45663
Parents: 06138f5
Author: Cheng Wang <cheng.w...@kyligence.io>
Authored: Thu Sep 14 10:39:57 2017 +0800
Committer: Roger Shi <rogershijich...@gmail.com>
Committed: Fri Sep 15 14:35:56 2017 +0800

----------------------------------------------------------------------
 .../spark/SparkBatchCubingJobBuilder2.java      | 43 +++++++++++---------
 .../kylin/engine/spark/SparkCubingByLayer.java  | 19 ++++++---
 2 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/35b9d9ae/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 27bcff3..47ea3d0 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -42,9 +42,30 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
 
     @Override
     protected void addLayerCubingSteps(final CubingJob result, final String 
jobId, final String cuboidRootPath) {
-        IJoinedFlatTableDesc flatTableDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
         final SparkExecutable sparkExecutable = new SparkExecutable();
         sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
+        configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath);
+        result.addTask(sparkExecutable);
+    }
+
+    @Override
+    protected void addInMemCubingSteps(final CubingJob result, String jobId, 
String cuboidRootPath) {
+
+    }
+
+    private static String findJar(String className, String perferLibraryName) {
+        try {
+            return ClassUtil.findContainingJar(Class.forName(className), 
perferLibraryName);
+        } catch (ClassNotFoundException e) {
+            logger.warn("failed to locate jar for class " + className + ", 
ignore it");
+        }
+
+        return "";
+    }
+
+    public static void configureSparkJob(final CubeSegment seg, final 
SparkExecutable sparkExecutable,
+            final String jobId, final String cuboidRootPath) {
+        IJoinedFlatTableDesc flatTableDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
@@ -64,28 +85,10 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
 
         StringUtil.appendWithSeparator(jars, 
seg.getConfig().getSparkAdditionalJars());
         sparkExecutable.setJars(jars.toString());
-
         
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
-        result.addTask(sparkExecutable);
-    }
-
-    @Override
-    protected void addInMemCubingSteps(final CubingJob result, String jobId, 
String cuboidRootPath) {
-
-    }
-
-    private String findJar(String className, String perferLibraryName) {
-        try {
-            return ClassUtil.findContainingJar(Class.forName(className), 
perferLibraryName);
-        } catch (ClassNotFoundException e) {
-            logger.warn("failed to locate jar for class " + className + ", 
ignore it");
-        }
-
-        return "";
     }
 
-    private String getSegmentMetadataUrl(KylinConfig kylinConfig, String 
segmentID) {
+    private static String getSegmentMetadataUrl(KylinConfig kylinConfig, 
String segmentID) {
         return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID 
+ "@hdfs";
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/35b9d9ae/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 5d5b930..06cc988 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -143,7 +143,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         final Job job = Job.getInstance(confOverwrite);
 
         logger.info("RDD Output path: {}", outputPath);
-        setHadoopConf(job);
+        setHadoopConf(job, cubeSegment, metaUrl);
 
         int countMeasureIndex = 0;
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
@@ -189,7 +189,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, 
partition).persist(storageLevel);
 
-        saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, 
job);
+        saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, 
job, envConfig);
 
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
@@ -199,7 +199,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             if (envConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
             }
-            saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, 
outputPath, level, job);
+            saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, 
outputPath, level, job, envConfig);
             allRDDs[level - 1].unpersist();
         }
         allRDDs[totalLevels].unpersist();
@@ -207,7 +207,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         deleteHDFSMeta(metaUrl);
     }
 
-    protected void setHadoopConf(Job job) throws Exception {
+    protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) 
throws Exception {
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);
     }
@@ -218,17 +218,24 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         int partition = (int) (baseCuboidSize / rddCut);
         partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
         partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+        logger.info("Partition for spark cubing: {}", partition);
         return partition;
     }
 
+    protected JavaPairRDD<ByteArray, Object[]> 
prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config,
+            CubeSegment segment, int level) {
+        return rdd;
+    }
+
     protected void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, 
final String metaUrl, final String cubeName,
-            final CubeSegment cubeSeg, final String hdfsBaseLocation, int 
level, Job job) throws Exception {
+            final CubeSegment cubeSeg, final String hdfsBaseLocation, final 
int level, final Job job,
+            final KylinConfig kylinConfig) throws Exception {
         final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
 
         IMROutput2.IMROutputFormat outputFormat = 
MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
         outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level);
 
-        rdd.mapToPair(
+        prepareOutput(rdd, kylinConfig, cubeSeg, level).mapToPair(
                 new PairFunction<Tuple2<ByteArray, Object[]>, 
org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
                     private volatile transient boolean initialized = false;
                     BufferedMeasureCodec codec;

Reply via email to