KYLIN-1656 Improve performance of MRv2 engine by making each mapper handles a 
configured number of records

Signed-off-by: shaofengshi <shaofeng...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6aad4e9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6aad4e9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6aad4e9

Branch: refs/heads/master
Commit: c6aad4e9636dbc52b3459268b45b433dc5f628ec
Parents: 6c32fd6
Author: gaodayue <gaoda...@meituan.com>
Authored: Thu May 5 12:59:19 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Wed May 18 10:38:32 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../kylin/common/util/BufferedLogger.java       |  47 ++++++
 .../org/apache/kylin/job/JoinedFlatTable.java   |  67 ++++----
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../apache/kylin/source/hive/HiveMRInput.java   | 153 ++++++++++++++++++-
 5 files changed, 243 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 1062749..14dda82 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -293,6 +293,10 @@ abstract public class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(getOptional("kylin.job.mapreduce.max.reducer.number", "500"));
     }
 
+    public int getHadoopJobMapperInputRows() {
+        return 
Integer.parseInt(getOptional("kylin.job.mapreduce.mapper.input.rows", 
"500000"));
+    }
+
     public boolean getRunAsRemoteCommand() {
         return 
Boolean.parseBoolean(getOptional("kylin.job.run.as.remote.cmd"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java 
b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
new file mode 100644
index 0000000..cef598d
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+/**
+ * A Logger that remembers all the logged message.
+ *
+ * <b>This class is not thread-safe.</b>
+ */
+public class BufferedLogger implements Logger {
+    private final org.slf4j.Logger wrappedLogger;
+    private final StringBuilder buffer = new StringBuilder();
+
+    public BufferedLogger(org.slf4j.Logger wrappedLogger) {
+        this.wrappedLogger = wrappedLogger;
+    }
+
+    @Override
+    public void log(String message) {
+        wrappedLogger.info(message);
+        buffer.append(message).append("\n");
+    }
+
+    public String getBufferedLog() {
+        return buffer.toString();
+    }
+
+    public void resetBuffer() {
+        buffer.setLength(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 5886325..6ae8110 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -55,6 +55,36 @@ public class JoinedFlatTable {
         return storageDfsDir + "/" + intermediateTableDesc.getTableName();
     }
 
+    public static String generateHiveSetStatements(JobEngineConfig 
engineConfig) throws IOException {
+        StringBuilder buffer = new StringBuilder();
+        File hadoopPropertiesFile = new 
File(engineConfig.getHiveConfFilePath());
+
+        if (hadoopPropertiesFile.exists()) {
+            DocumentBuilderFactory factory = 
DocumentBuilderFactory.newInstance();
+            DocumentBuilder builder;
+            Document doc;
+            try {
+                builder = factory.newDocumentBuilder();
+                doc = builder.parse(hadoopPropertiesFile);
+                NodeList nl = doc.getElementsByTagName("property");
+                for (int i = 0; i < nl.getLength(); i++) {
+                    String name = 
doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
+                    String value = 
doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
+                    if (!name.equals("tmpjars")) {
+                        buffer.append("SET " + name + "=" + value + ";\n");
+                    }
+                }
+
+            } catch (ParserConfigurationException e) {
+                throw new IOException(e);
+            } catch (SAXException e) {
+                throw new IOException(e);
+            }
+        }
+
+        return buffer.toString();
+    }
+
     public static String generateCreateTableStatement(IJoinedFlatTableDesc 
intermediateTableDesc, String storageDfsDir) {
         StringBuilder ddl = new StringBuilder();
 
@@ -86,37 +116,16 @@ public class JoinedFlatTable {
 
     public static String generateInsertDataStatement(IJoinedFlatTableDesc 
intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
         StringBuilder sql = new StringBuilder();
-
-        File hadoopPropertiesFile = new 
File(engineConfig.getHiveConfFilePath());
-
-        if (hadoopPropertiesFile.exists()) {
-            DocumentBuilderFactory factory = 
DocumentBuilderFactory.newInstance();
-            DocumentBuilder builder;
-            Document doc;
-            try {
-                builder = factory.newDocumentBuilder();
-                doc = builder.parse(hadoopPropertiesFile);
-                NodeList nl = doc.getElementsByTagName("property");
-                for (int i = 0; i < nl.getLength(); i++) {
-                    String name = 
doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
-                    String value = 
doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
-                    if (name.equals("tmpjars") == false) {
-                        sql.append("SET " + name + "=" + value + 
";").append("\n");
-                    }
-                }
-
-            } catch (ParserConfigurationException e) {
-                throw new IOException(e);
-            } catch (SAXException e) {
-                throw new IOException(e);
-            }
-        }
-
+        sql.append(generateHiveSetStatements(engineConfig));
         sql.append("INSERT OVERWRITE TABLE " + 
intermediateTableDesc.getTableName() + " " + 
generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
-
         return sql.toString();
     }
 
+    public static String 
generateRedistributeDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+        final String tableName = intermediateTableDesc.getTableName();
+        return "INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + 
tableName + " distribute by rand();\n";
+    }
+
     public static String generateSelectDataStatement(IJoinedFlatTableDesc 
intermediateTableDesc) {
         StringBuilder sql = new StringBuilder();
         sql.append("SELECT" + "\n");
@@ -135,6 +144,10 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
+    public static String generateSelectRowCountStatement(IJoinedFlatTableDesc 
intermediateTableDesc, String outputDir) {
+        return "INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) 
FROM " + intermediateTableDesc.getTableName() + ";\n";
+    }
+
     private static Map<String, String> buildTableAliasMap(DataModelDesc 
dataModelDesc) {
         Map<String, String> tableAliasMap = new HashMap<String, String>();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index d47d550..50e8238 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -37,6 +37,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension 
Dictionary";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create 
Intermediate Flat Hive Table";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = 
"Materialize Hive View in Lookup Tables";
+    public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = 
"Redistribute Intermediate Flat Hive Table";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact 
Table Distinct Columns";
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base 
Cuboid Data";
     public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";

http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 9997b09..248a57b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -19,9 +19,11 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -29,12 +31,17 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -46,7 +53,6 @@ import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 public class HiveMRInput implements IMRInput {
@@ -109,11 +115,14 @@ public class HiveMRInput implements IMRInput {
 
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
+            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
+
             jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, 
jobFlow.getId()));
             AbstractExecutable task = 
createLookupHiveViewMaterializationStep(jobFlow.getId());
             if(task != null) {
                 jobFlow.addTask(task);
             }
+            jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, 
flatHiveTableDesc, jobFlow.getId(), cubeName));
         }
 
         public static AbstractExecutable 
createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId) {
@@ -142,7 +151,6 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
-
         public ShellExecutable createLookupHiveViewMaterializationStep(String 
jobId) {
             ShellExecutable step = new ShellExecutable();;
             
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
@@ -185,6 +193,27 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
+        public static AbstractExecutable 
createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId, String cubeName) {
+            StringBuilder hiveInitBuf = new StringBuilder();
+            hiveInitBuf.append("USE 
").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
+            try {
+                
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to generate hive set 
statements for RedistributeFlatHiveTableStep", e);
+            }
+
+            String rowCountOutputDir = 
JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
+
+            RedistributeFlatHiveTableStep step = new 
RedistributeFlatHiveTableStep();
+            step.setInitStatement(hiveInitBuf.toString());
+            
step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc,
 rowCountOutputDir));
+            step.setRowCountOutputDir(rowCountOutputDir);
+            
step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeDataStatement(flatTableDesc));
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            
step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+            return step;
+        }
+
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
             GarbageCollectionStep step = new GarbageCollectionStep();
@@ -205,6 +234,126 @@ public class HiveMRInput implements IMRInput {
         }
     }
 
+    public static class RedistributeFlatHiveTableStep extends 
AbstractExecutable {
+        private final BufferedLogger stepLogger = new BufferedLogger(logger);
+
+        private void computeRowCount(CliCommandExecutor cmdExecutor) throws 
IOException {
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            hiveCmdBuilder.addStatement(getInitStatement());
+            hiveCmdBuilder.addStatement("set 
hive.exec.compress.output=false;\n");
+            hiveCmdBuilder.addStatement(getSelectRowCountStatement());
+            final String cmd = hiveCmdBuilder.build();
+
+            stepLogger.log("Compute row count of flat hive table, cmd: ");
+            stepLogger.log(cmd);
+
+            Pair<Integer, String> response = cmdExecutor.execute(cmd, 
stepLogger);
+            if (response.getFirst() != 0) {
+                throw new RuntimeException("Failed to compute row count of 
flat hive table");
+            }
+        }
+
+        private long readRowCountFromFile(Path file) throws IOException {
+            FileSystem fs = FileSystem.get(file.toUri(), 
HadoopUtil.getCurrentConfiguration());
+            InputStream in = fs.open(file);
+            try {
+                String content = IOUtils.toString(in);
+                return Long.valueOf(content.trim()); // strip the '\n' 
character
+
+            } finally {
+                IOUtils.closeQuietly(in);
+            }
+        }
+
+        private int determineNumReducer(KylinConfig config) throws IOException 
{
+            computeRowCount(config.getCliCommandExecutor());
+
+            Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
+            long rowCount = readRowCountFromFile(rowCountFile);
+            int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+            int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+            numReducers = Math.max(1, numReducers);
+
+            stepLogger.log("total input rows = " + rowCount);
+            stepLogger.log("expected input rows per mapper = " + 
mapperInputRows);
+            stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " 
+ numReducers);
+
+            return numReducers;
+        }
+
+        private void redistributeTable(KylinConfig config, int numReducers) 
throws IOException {
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            hiveCmdBuilder.addStatement(getInitStatement());
+            hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + 
numReducers + ";\n");
+            hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
+            hiveCmdBuilder.addStatement(getRedistributeDataStatement());
+            final String cmd = hiveCmdBuilder.toString();
+
+            stepLogger.log("Redistribute table, cmd: ");
+            stepLogger.log(cmd);
+
+            Pair<Integer, String> response = 
config.getCliCommandExecutor().execute(cmd, stepLogger);
+            if (response.getFirst() != 0) {
+                throw new RuntimeException("Failed to redistribute flat hive 
table");
+            }
+        }
+
+        private KylinConfig getCubeSpecificConfig() {
+            String cubeName = CubingExecutableUtil.getCubeName(getParams());
+            CubeManager manager = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = manager.getCube(cubeName);
+            return cube.getConfig();
+        }
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+            KylinConfig config = getCubeSpecificConfig();
+
+            try {
+                int numReducers = determineNumReducer(config);
+                redistributeTable(config, numReducers);
+                return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
+
+            } catch (Exception e) {
+                logger.error("job:" + getId() + " execute finished with 
exception", e);
+                return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+            }
+        }
+
+        public void setInitStatement(String sql) {
+            setParam("HiveInit", sql);
+        }
+
+        public String getInitStatement() {
+            return getParam("HiveInit");
+        }
+
+        public void setSelectRowCountStatement(String sql) {
+            setParam("HiveSelectRowCount", sql);
+        }
+
+        public String getSelectRowCountStatement() {
+            return getParam("HiveSelectRowCount");
+        }
+
+        public void setRedistributeDataStatement(String sql) {
+            setParam("HiveRedistributeData", sql);
+        }
+
+        public String getRedistributeDataStatement() {
+            return getParam("HiveRedistributeData");
+        }
+
+        public void setRowCountOutputDir(String rowCountOutputDir) {
+            setParam("rowCountOutputDir", rowCountOutputDir);
+        }
+
+        public String getRowCountOutputDir() {
+            return getParam("rowCountOutputDir");
+        }
+    }
+
     public static class GarbageCollectionStep extends AbstractExecutable {
         @Override
         protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {

Reply via email to