hive job use overrided MR job configuration by cube properties

Signed-off-by: shaofengshi <shaofeng...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ac356f01
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ac356f01
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ac356f01

Branch: refs/heads/master
Commit: ac356f014d52c4b13ad72e9d6a537e50e9ace5fb
Parents: c92f79a
Author: lijieliang <lijieli...@cmss.chinamobile.com>
Authored: Fri Oct 14 13:01:32 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Mon Oct 17 18:25:26 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/hive/HiveMRInput.java   | 24 +++++++++++++++++---
 1 file changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ac356f01/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 202e480..2ec1fbb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -35,6 +35,7 @@ import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -60,6 +61,8 @@ import com.google.common.collect.Sets;
 
 public class HiveMRInput implements IMRInput {
 
+    private static final String MR_OVERRIDE_JOB_QUEUENAME = 
"mapreduce.job.queuename";
+
     @Override
     public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         return new BatchCubingInputSide(flatDesc);
@@ -154,7 +157,10 @@ public class HiveMRInput implements IMRInput {
             StringBuilder hiveInitBuf = new StringBuilder();
             hiveInitBuf.append("USE 
").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
             
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
-
+            final KylinConfig kylinConfig = ((CubeSegment) 
flatTableDesc.getSegment()).getConfig();
+            if 
(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+                hiveInitBuf.append("SET 
mapreduce.job.queuename=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
+            }
             String rowCountOutputDir = 
JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
 
             RedistributeFlatHiveTableStep step = new 
RedistributeFlatHiveTableStep();
@@ -172,6 +178,11 @@ public class HiveMRInput implements IMRInput {
             final ShellExecutable step = new ShellExecutable();
 
             final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            final KylinConfig kylinConfig = ((CubeSegment) 
flatTableDesc.getSegment()).getConfig();
+            if 
(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+                hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
+                        
kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
+            }
             
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
             hiveCmdBuilder.addStatement("set 
hive.exec.compress.output=false;\n");
             
hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc,
 rowCountOutputDir));
@@ -187,7 +198,7 @@ public class HiveMRInput implements IMRInput {
             
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
             HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
 
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            KylinConfig kylinConfig = ((CubeSegment) 
flatDesc.getSegment()).getConfig();
             MetadataManager metadataManager = 
MetadataManager.getInstance(kylinConfig);
             final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
 
@@ -201,6 +212,10 @@ public class HiveMRInput implements IMRInput {
             if (lookupViewsTables.size() == 0) {
                 return null;
             }
+            if 
(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+                hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" +
+                        
kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n");
+            }
             final String useDatabaseHql = "USE " + 
conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
             hiveCmdBuilder.addStatement(useDatabaseHql);
             
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
@@ -225,7 +240,10 @@ public class HiveMRInput implements IMRInput {
         public static AbstractExecutable 
createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId, String cubeName, boolean redistribute, String 
rowCountOutputDir) {
             StringBuilder hiveInitBuf = new StringBuilder();
             
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
-
+            final KylinConfig kylinConfig = ((CubeSegment) 
flatTableDesc.getSegment()).getConfig();
+            if 
(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) {
+                hiveInitBuf.append("SET mapreduce.job.queuename 
=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n");
+            }
             final String useDatabaseHql = "USE " + 
conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
             final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatTableDesc);
             final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatTableDesc, 
JobBuilderSupport.getJobWorkingDir(conf, jobId));

Reply via email to