This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch sync in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 451b7bae796e47358900678ac18dce24a2e6d8df Author: Yifei Wu <vaful...@gmail.com> AuthorDate: Tue Feb 27 14:34:23 2018 +0800 KYLIN-3267 MR config override at project/cube level only for mem-hungry build steps --- build/conf/kylin_job_conf_inmem.xml | 4 ++ .../org/apache/kylin/common/KylinConfigBase.java | 5 +++ .../engine/mr/common/MapReduceExecutable.java | 21 ++++++--- .../engine/mr/common/MapReduceExecutableTest.java | 50 ++++++++++++++++++++++ examples/test_case_data/localmeta/kylin.properties | 2 + .../sandbox/kylin_job_conf_inmem.xml | 4 ++ 6 files changed, 80 insertions(+), 6 deletions(-) diff --git a/build/conf/kylin_job_conf_inmem.xml b/build/conf/kylin_job_conf_inmem.xml index d69435a..ddda4dd 100644 --- a/build/conf/kylin_job_conf_inmem.xml +++ b/build/conf/kylin_job_conf_inmem.xml @@ -15,6 +15,10 @@ limitations under the License. --> <configuration> + <property> + <name>mapreduce.job.is-mem-hungry</name> + <value>true</value> + </property> <property> <name>mapreduce.job.split.metainfo.maxsize</name> diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index eee2d09..7596d2c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1079,6 +1079,11 @@ abstract public class KylinConfigBase implements Serializable { return getPropertiesByPrefix("kylin.engine.mr.config-override."); } + // used for some mem-hungry step + public Map<String, String> getMemHungryConfigOverride() { + return getPropertiesByPrefix("kylin.engine.mr.mem-hungry-config-override."); + } + public Map<String, String> getUHCMRConfigOverride() { return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override."); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index ca1311d..ae285cc 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -120,7 +120,8 @@ public class MapReduceExecutable extends AbstractExecutable { job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID) + " resumed"); } else { - final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); + final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil + .forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); final AbstractHadoopJob hadoopJob = constructor.newInstance(); hadoopJob.setConf(conf); hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away @@ -154,7 +155,8 @@ public class MapReduceExecutable extends AbstractExecutable { mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin"); return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin"); } - if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) { + if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED + || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) { final long waitTime = System.currentTimeMillis() - getStartTime(); setMapReduceWaitTime(waitTime); } @@ -262,24 +264,31 @@ public class MapReduceExecutable extends AbstractExecutable { options.addOption(OPTION_CUBE_NAME); CustomParser parser = new CustomParser(); CommandLine commandLine = parser.parse(options, jobParams); - + String confFile = commandLine.getOptionValue(BatchConstants.ARG_CONF); String cubeName = commandLine.getOptionValue(BatchConstants.ARG_CUBE_NAME); List<String> remainingArgs = Lists.newArrayList(); - + if (StringUtils.isNotBlank(confFile)) { conf.addResource(new Path(confFile)); } - + if (StringUtils.isNotBlank(cubeName)) { for (Map.Entry<String, String> entry : CubeManager.getInstance(config).getCube(cubeName).getConfig() .getMRConfigOverride().entrySet()) { conf.set(entry.getKey(), entry.getValue()); } + if (conf.get("mapreduce.job.is-mem-hungry") != null + && Boolean.valueOf(conf.get("mapreduce.job.is-mem-hungry"))) { + for (Map.Entry<String, String> entry : CubeManager.getInstance(config).getCube(cubeName).getConfig() + .getMemHungryConfigOverride().entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + } remainingArgs.add("-" + BatchConstants.ARG_CUBE_NAME); remainingArgs.add(cubeName); } - + remainingArgs.addAll(parser.getRemainingArgs()); return (String[]) remainingArgs.toArray(new String[remainingArgs.size()]); } diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/MapReduceExecutableTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/MapReduceExecutableTest.java new file mode 100644 index 0000000..6e4a57d --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/MapReduceExecutableTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.common; + +import java.lang.reflect.Method; + +import org.apache.hadoop.conf.Configuration; +import org.apache.kylin.common.HotLoadKylinPropertiesTestCase; +import org.apache.kylin.common.KylinConfig; +import org.junit.Assert; +import org.junit.Test; + +public class MapReduceExecutableTest extends HotLoadKylinPropertiesTestCase { + + @Test + public void testOverwriteJobConf() throws Exception { + MapReduceExecutable executable = new MapReduceExecutable(); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + + Method method = MapReduceExecutable.class.getDeclaredMethod("overwriteJobConf", Configuration.class, + KylinConfig.class, new String[] {}.getClass()); + method.setAccessible(true); + Configuration conf = new Configuration(); + conf.set("mapreduce.job.is-mem-hungry", "true"); + method.invoke(executable, conf, config, new String[] { "-cubename", "ci_inner_join_cube" }); + Assert.assertEquals("mem-test1", conf.get("test1")); + Assert.assertEquals("mem-test2", conf.get("test2")); + + conf.clear(); + method.invoke(executable, conf, config, new String[] { "-cubename", "ci_inner_join_cube" }); + Assert.assertEquals("test1", conf.get("test1")); + Assert.assertEquals("test2", conf.get("test2")); + } +} \ No newline at end of file diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties index 257d69c..805432b 100644 --- a/examples/test_case_data/localmeta/kylin.properties +++ b/examples/test_case_data/localmeta/kylin.properties @@ -137,6 +137,8 @@ kylin.security.saml.context-path=/kylin kylin.test.bcc.new.key=some-value kylin.engine.mr.config-override.test1=test1 kylin.engine.mr.config-override.test2=test2 +kylin.engine.mr.mem-hungry-config-override.test1=mem-test1 +kylin.engine.mr.mem-hungry-config-override.test2=mem-test2 kylin.job.lock=org.apache.kylin.job.lock.MockJobLockDup kylin.job.lock=org.apache.kylin.job.lock.MockJobLock diff --git a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml index 5094d24..80b1d4d 100644 --- a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml +++ b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml @@ -16,6 +16,10 @@ limitations under the License. --> <configuration> + <property> + <name>mapreduce.job.is-mem-hungry</name> + <value>true</value> + </property> <property> <name>mapreduce.job.split.metainfo.maxsize</name> -- To stop receiving notification emails like this one, please contact liy...@apache.org.