This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 451b7bae796e47358900678ac18dce24a2e6d8df
Author: Yifei Wu <vaful...@gmail.com>
AuthorDate: Tue Feb 27 14:34:23 2018 +0800

    KYLIN-3267 MR config override at project/cube level only for mem-hungry 
build steps
---
 build/conf/kylin_job_conf_inmem.xml                |  4 ++
 .../org/apache/kylin/common/KylinConfigBase.java   |  5 +++
 .../engine/mr/common/MapReduceExecutable.java      | 21 ++++++---
 .../engine/mr/common/MapReduceExecutableTest.java  | 50 ++++++++++++++++++++++
 examples/test_case_data/localmeta/kylin.properties |  2 +
 .../sandbox/kylin_job_conf_inmem.xml               |  4 ++
 6 files changed, 80 insertions(+), 6 deletions(-)

diff --git a/build/conf/kylin_job_conf_inmem.xml 
b/build/conf/kylin_job_conf_inmem.xml
index d69435a..ddda4dd 100644
--- a/build/conf/kylin_job_conf_inmem.xml
+++ b/build/conf/kylin_job_conf_inmem.xml
@@ -15,6 +15,10 @@
   limitations under the License.
 -->
 <configuration>
+    <property>
+        <name>mapreduce.job.is-mem-hungry</name>
+        <value>true</value>
+    </property>
 
     <property>
         <name>mapreduce.job.split.metainfo.maxsize</name>
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index eee2d09..7596d2c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1079,6 +1079,11 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getPropertiesByPrefix("kylin.engine.mr.config-override.");
     }
 
+    // used for some mem-hungry step
+    public Map<String, String> getMemHungryConfigOverride() {
+        return 
getPropertiesByPrefix("kylin.engine.mr.mem-hungry-config-override.");
+    }
+
     public Map<String, String> getUHCMRConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override.");
     }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index ca1311d..ae285cc 100755
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -120,7 +120,8 @@ public class MapReduceExecutable extends AbstractExecutable 
{
                 job = new 
Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
                 logger.info("mr_job_id:" + 
extra.get(ExecutableConstants.MR_JOB_ID) + " resumed");
             } else {
-                final Constructor<? extends AbstractHadoopJob> constructor = 
ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
+                final Constructor<? extends AbstractHadoopJob> constructor = 
ClassUtil
+                        .forName(mapReduceJobClass, 
AbstractHadoopJob.class).getConstructor();
                 final AbstractHadoopJob hadoopJob = constructor.newInstance();
                 hadoopJob.setConf(conf);
                 hadoopJob.setAsync(true); // so the ToolRunner.run() returns 
right away
@@ -154,7 +155,8 @@ public class MapReduceExecutable extends AbstractExecutable 
{
                     mgr.updateJobOutput(getId(), ExecutableState.ERROR, 
hadoopCmdOutput.getInfo(), "killed by admin");
                     return new ExecuteResult(ExecuteResult.State.FAILED, 
"killed by admin");
                 }
-                if (status == JobStepStatusEnum.WAITING && (newStatus == 
JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus 
== JobStepStatusEnum.RUNNING)) {
+                if (status == JobStepStatusEnum.WAITING && (newStatus == 
JobStepStatusEnum.FINISHED
+                        || newStatus == JobStepStatusEnum.ERROR || newStatus 
== JobStepStatusEnum.RUNNING)) {
                     final long waitTime = System.currentTimeMillis() - 
getStartTime();
                     setMapReduceWaitTime(waitTime);
                 }
@@ -262,24 +264,31 @@ public class MapReduceExecutable extends 
AbstractExecutable {
         options.addOption(OPTION_CUBE_NAME);
         CustomParser parser = new CustomParser();
         CommandLine commandLine = parser.parse(options, jobParams);
-        
+
         String confFile = commandLine.getOptionValue(BatchConstants.ARG_CONF);
         String cubeName = 
commandLine.getOptionValue(BatchConstants.ARG_CUBE_NAME);
         List<String> remainingArgs = Lists.newArrayList();
-        
+
         if (StringUtils.isNotBlank(confFile)) {
             conf.addResource(new Path(confFile));
         }
-        
+
         if (StringUtils.isNotBlank(cubeName)) {
             for (Map.Entry<String, String> entry : 
CubeManager.getInstance(config).getCube(cubeName).getConfig()
                     .getMRConfigOverride().entrySet()) {
                 conf.set(entry.getKey(), entry.getValue());
             }
+            if (conf.get("mapreduce.job.is-mem-hungry") != null
+                    && 
Boolean.valueOf(conf.get("mapreduce.job.is-mem-hungry"))) {
+                for (Map.Entry<String, String> entry : 
CubeManager.getInstance(config).getCube(cubeName).getConfig()
+                        .getMemHungryConfigOverride().entrySet()) {
+                    conf.set(entry.getKey(), entry.getValue());
+                }
+            }
             remainingArgs.add("-" + BatchConstants.ARG_CUBE_NAME);
             remainingArgs.add(cubeName);
         }
-        
+
         remainingArgs.addAll(parser.getRemainingArgs());
         return (String[]) remainingArgs.toArray(new 
String[remainingArgs.size()]);
     }
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/MapReduceExecutableTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/MapReduceExecutableTest.java
new file mode 100644
index 0000000..6e4a57d
--- /dev/null
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/MapReduceExecutableTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.HotLoadKylinPropertiesTestCase;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MapReduceExecutableTest extends HotLoadKylinPropertiesTestCase {
+
+    @Test
+    public void testOverwriteJobConf() throws Exception {
+        MapReduceExecutable executable = new MapReduceExecutable();
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+        Method method = 
MapReduceExecutable.class.getDeclaredMethod("overwriteJobConf", 
Configuration.class,
+                KylinConfig.class, new String[] {}.getClass());
+        method.setAccessible(true);
+        Configuration conf = new Configuration();
+        conf.set("mapreduce.job.is-mem-hungry", "true");
+        method.invoke(executable, conf, config, new String[] { "-cubename", 
"ci_inner_join_cube" });
+        Assert.assertEquals("mem-test1", conf.get("test1"));
+        Assert.assertEquals("mem-test2", conf.get("test2"));
+
+        conf.clear();
+        method.invoke(executable, conf, config, new String[] { "-cubename", 
"ci_inner_join_cube" });
+        Assert.assertEquals("test1", conf.get("test1"));
+        Assert.assertEquals("test2", conf.get("test2"));
+    }
+}
\ No newline at end of file
diff --git a/examples/test_case_data/localmeta/kylin.properties 
b/examples/test_case_data/localmeta/kylin.properties
index 257d69c..805432b 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -137,6 +137,8 @@ kylin.security.saml.context-path=/kylin
 kylin.test.bcc.new.key=some-value
 kylin.engine.mr.config-override.test1=test1
 kylin.engine.mr.config-override.test2=test2 
+kylin.engine.mr.mem-hungry-config-override.test1=mem-test1
+kylin.engine.mr.mem-hungry-config-override.test2=mem-test2 
 kylin.job.lock=org.apache.kylin.job.lock.MockJobLockDup
 kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
 
diff --git a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml 
b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
index 5094d24..80b1d4d 100644
--- a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
+++ b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
@@ -16,6 +16,10 @@
   limitations under the License.
 -->
 <configuration>
+    <property>
+        <name>mapreduce.job.is-mem-hungry</name>
+        <value>true</value>
+    </property>
 
     <property>
         <name>mapreduce.job.split.metainfo.maxsize</name>

-- 
To stop receiving notification emails like this one, please contact
liy...@apache.org.

Reply via email to