This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 0da41e2  KYLIN-4746 Improve build performance by reducing the count of 
calling 'count()' function
0da41e2 is described below

commit 0da41e20a652794d9328c819977754ec8c4f9941
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Thu Sep 3 23:19:04 2020 +0800

    KYLIN-4746 Improve build performance by reducing the count of calling 
'count()' function
---
 .../kylin/engine/spark/job/CubeBuildJob.java       | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 3a44d84..bbf50e8 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -77,7 +77,8 @@ public class CubeBuildJob extends SparkApplication {
     private CubeManager cubeManager;
     private CubeInstance cubeInstance;
     private BuildLayoutWithUpdate buildLayoutWithUpdate;
-    private Map<Long, Short> cuboidShardNum = Maps.newHashMap();
+    private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
+    private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
     public static void main(String[] args) {
         CubeBuildJob cubeBuildJob = new CubeBuildJob();
         cubeBuildJob.execute(args);
@@ -217,7 +218,10 @@ public class CubeBuildJob extends SparkApplication {
             cuboidsNumInLayer += toBuildCuboids.size();
             Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built 
cuboids is empty.");
             Dataset<Row> parentDS = info.getParentDS();
-            long parentDSCnt = parentDS.count();
+            // record the source count of flat table
+            if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) {
+                cuboidsRowCount.putIfAbsent(info.getLayoutId(), 
parentDS.count());
+            }
 
             for (LayoutEntity index : toBuildCuboids) {
                 Preconditions.checkNotNull(parentDS, "Parent dataset is null 
when building.");
@@ -229,8 +233,7 @@ public class CubeBuildJob extends SparkApplication {
 
                     @Override
                     public LayoutEntity build() throws IOException {
-                        return buildCuboid(seg, index, parentDS, st, 
info.getLayoutId(),
-                                parentDSCnt);
+                        return buildCuboid(seg, index, parentDS, st, 
info.getLayoutId());
                     }
                 }, config);
                 allIndexesInCurrentLayer.add(index);
@@ -292,7 +295,7 @@ public class CubeBuildJob extends SparkApplication {
     }
 
     private LayoutEntity buildCuboid(SegmentInfo seg, LayoutEntity cuboid, 
Dataset<Row> parent,
-                                    SpanningTree spanningTree, long parentId, 
long parentDSCnt) throws IOException {
+                                    SpanningTree spanningTree, long parentId) 
throws IOException {
         String parentName = String.valueOf(parentId);
         if (parentId == ParentSourceChooser.FLAT_TABLE_FLAG()) {
             parentName = "flat table";
@@ -308,7 +311,7 @@ public class CubeBuildJob extends SparkApplication {
             Set<Integer> orderedDims = 
layoutEntity.getOrderedDimensions().keySet();
             Dataset<Row> afterSort = 
afterPrj.select(NSparkCubingUtil.getColumns(orderedDims))
                     
.sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims));
-            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt);
+            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
         } else {
             Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, 
dimIndexes, cuboid.getOrderedMeasures(),
                     spanningTree, false);
@@ -320,7 +323,7 @@ public class CubeBuildJob extends SparkApplication {
                     .select(NSparkCubingUtil.getColumns(rowKeys, 
layoutEntity.getOrderedMeasures().keySet()))
                     
.sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys));
 
-            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt);
+            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
         }
         ss.sparkContext().setJobDescription(null);
         logger.info("Finished Build index :{}, in segment:{}", cuboid.getId(), 
seg.id());
@@ -328,7 +331,7 @@ public class CubeBuildJob extends SparkApplication {
     }
 
     private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo seg, 
LayoutEntity layout,
-                                     long parentDSCnt) throws IOException {
+                                     long parentId) throws IOException {
         long layoutId = layout.getId();
 
         // for spark metrics
@@ -349,8 +352,11 @@ public class CubeBuildJob extends SparkApplication {
         if (rowCount == -1) {
             infos.recordAbnormalLayouts(layoutId, "'Job metrics seems null, 
use count() to collect cuboid rows.'");
             logger.debug("Can not get cuboid row cnt, use count() to collect 
cuboid rows.");
-            layout.setRows(dataset.count());
-            layout.setSourceRows(parentDSCnt);
+            long cuboidRowCnt = dataset.count();
+            layout.setRows(cuboidRowCnt);
+            // record the row count of cuboid
+            cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
+            layout.setSourceRows(cuboidsRowCount.get(parentId));
         } else {
             layout.setRows(rowCount);
             
layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));

Reply via email to