KYLIN-2095 Allow cube to override Hive job configuration by properties

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07e81fd0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07e81fd0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07e81fd0

Branch: refs/heads/master
Commit: 07e81fd0b744e782d84383e327b1923cfc178d42
Parents: cc2b59f
Author: shaofengshi <shaofeng...@apache.org>
Authored: Mon Oct 17 22:07:57 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Mon Oct 17 22:07:57 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 ++
 .../apache/kylin/source/hive/HiveMRInput.java   | 42 ++++++++++++--------
 2 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/07e81fd0/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 73ac788..5a06813 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -250,6 +250,10 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getPropertiesByPrefix("kylin.job.mr.config.override.");
     }
 
+    public Map<String, String> getHiveConfigOverride() {
+        return getPropertiesByPrefix("kylin.hive.config.override.");
+    }
+
     public String getKylinSparkJobJarPath() {
         final String jobJar = getOptional("kylin.job.jar.spark");
         if (StringUtils.isNotEmpty(jobJar)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/07e81fd0/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 4ec8d3d..f3fceb1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -20,6 +20,8 @@ package org.apache.kylin.source.hive;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
@@ -61,7 +63,6 @@ import com.google.common.collect.Sets;
 
 public class HiveMRInput implements IMRInput {
 
-    private static final String MR_OVERRIDE_JOB_QUEUENAME = 
"mapreduce.job.queuename";
 
     @Override
     public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
@@ -157,9 +158,7 @@ public class HiveMRInput implements IMRInput {
             hiveInitBuf.append("USE 
").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
             
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
             final KylinConfig kylinConfig = ((CubeSegment) 
flatTableDesc.getSegment()).getConfig();
-            if 
(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
-                hiveInitBuf.append("SET 
mapreduce.job.queuename=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
-            }
+            appendHiveOverrideProperties(kylinConfig, hiveInitBuf);
             String rowCountOutputDir = 
JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
 
             RedistributeFlatHiveTableStep step = new 
RedistributeFlatHiveTableStep();
@@ -178,10 +177,7 @@ public class HiveMRInput implements IMRInput {
 
             final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
             final KylinConfig kylinConfig = ((CubeSegment) 
flatTableDesc.getSegment()).getConfig();
-            if 
(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
-                hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
-                        
kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
-            }
+            appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder);
             
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
             hiveCmdBuilder.addStatement("set 
hive.exec.compress.output=false;\n");
             
hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc,
 rowCountOutputDir));
@@ -192,6 +188,7 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
+
         public ShellExecutable createLookupHiveViewMaterializationStep(String 
jobId) {
             ShellExecutable step = new ShellExecutable();
             
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
@@ -211,10 +208,7 @@ public class HiveMRInput implements IMRInput {
             if (lookupViewsTables.size() == 0) {
                 return null;
             }
-            if 
(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
-                hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
-                        
kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
-            }
+            appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder);
             final String useDatabaseHql = "USE " + 
conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
             hiveCmdBuilder.addStatement(useDatabaseHql);
             
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
@@ -240,9 +234,7 @@ public class HiveMRInput implements IMRInput {
             StringBuilder hiveInitBuf = new StringBuilder();
             
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
             final KylinConfig kylinConfig = ((CubeSegment) 
flatTableDesc.getSegment()).getConfig();
-            if 
(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
-                hiveInitBuf.append("SET mapreduce.job.queuename 
=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
-            }
+            appendHiveOverrideProperties(kylinConfig, hiveInitBuf);
             final String useDatabaseHql = "USE " + 
conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
             final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatTableDesc);
             final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatTableDesc, 
JobBuilderSupport.getJobWorkingDir(conf, jobId));
@@ -301,7 +293,7 @@ public class HiveMRInput implements IMRInput {
             FileSystem fs = FileSystem.get(file.toUri(), 
HadoopUtil.getCurrentConfiguration());
             InputStream in = fs.open(file);
             try {
-                String content = IOUtils.toString(in);
+                String content = IOUtils.toString(in, 
Charset.defaultCharset());
                 return Long.valueOf(content.trim()); // strip the '\n' 
character
 
             } finally {
@@ -490,4 +482,22 @@ public class HiveMRInput implements IMRInput {
         }
     }
 
+
+    private static void appendHiveOverrideProperties(final KylinConfig 
kylinConfig, StringBuilder hiveCmd) {
+        final Map<String, String> hiveConfOverride = 
kylinConfig.getHiveConfigOverride();
+        if (hiveConfOverride.isEmpty() == false) {
+            for (String key : hiveConfOverride.keySet()) {
+                hiveCmd.append("SET 
").append(key).append("=").append(hiveConfOverride.get(key)).append(";\n");
+            }
+        }
+    }
+
+    private static void appendHiveOverrideProperties2(final KylinConfig 
kylinConfig, HiveCmdBuilder hiveCmdBuilder) {
+        final Map<String, String> hiveConfOverride = 
kylinConfig.getHiveConfigOverride();
+        if (hiveConfOverride.isEmpty() == false) {
+            for (String key : hiveConfOverride.keySet()) {
+                hiveCmdBuilder.addStatement("SET " + key + "=" + 
hiveConfOverride.get(key) + ";\n");
+            }
+        }
+    }
 }

Reply via email to