Repository: kylin Updated Branches: refs/heads/master 094510cf3 -> 8de31563c
KYLIN-1546 Add tool to extract job related information Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8de31563 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8de31563 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8de31563 Branch: refs/heads/master Commit: 8de31563cc11f7babdb8c171f64224b63785ab8d Parents: 9e31a19 Author: lidongsjtu <lid...@apache.org> Authored: Tue Mar 29 19:56:24 2016 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Tue Mar 29 20:04:16 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/admin/JobInfoExtractor.java | 160 +++++++++++++++++++ .../apache/kylin/admin/YarnLogExtractor.java | 134 ++++++++++++++++ 2 files changed, 294 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8de31563/assembly/src/main/java/org/apache/kylin/admin/JobInfoExtractor.java ---------------------------------------------------------------------- diff --git a/assembly/src/main/java/org/apache/kylin/admin/JobInfoExtractor.java b/assembly/src/main/java/org/apache/kylin/admin/JobInfoExtractor.java new file mode 100644 index 0000000..fadccae --- /dev/null +++ b/assembly/src/main/java/org/apache/kylin/admin/JobInfoExtractor.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.admin; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.ResourceTool; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.dao.ExecutableDao; +import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.manager.ExecutableManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * Created by dongli on 3/29/16. + */ +public class JobInfoExtractor extends AbstractApplication { + private static final Logger logger = LoggerFactory.getLogger(JobInfoExtractor.class); + + @SuppressWarnings("static-access") + private static final Option OPTION_JOB_ID = OptionBuilder.withArgName("jobId").hasArg().isRequired(true).withDescription("specify the Job ID to extract information. ").create("jobId"); + + @SuppressWarnings("static-access") + private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(true).withDescription("specify the dest dir to save the related information").create("destDir"); + + @SuppressWarnings("static-access") + private static final Option OPTION_INCLUDE_CUBE = OptionBuilder.withArgName("includeCube").hasArg().isRequired(false).withDescription("set this to true if want to extract related cube info too. Default true").create("includeCube"); + + @SuppressWarnings("static-access") + private static final Option OPTION_INCLUDE_YARN_LOGS = OptionBuilder.withArgName("includeYarnLogs").hasArg().isRequired(false).withDescription("set this to true if want to extract related yarn logs too. Default true").create("includeYarnLogs"); + + private Options options; + + private KylinConfig kylinConfig; + private CubeMetaExtractor cubeMetaExtractor; + private YarnLogExtractor yarnLogExtractor; + + private ExecutableDao executableDao; + private ExecutableManager executableManager; + + List<String> requiredResources = Lists.newArrayList(); + + public JobInfoExtractor() { + cubeMetaExtractor = new CubeMetaExtractor(); + yarnLogExtractor = new YarnLogExtractor(); + + options = new Options(); + options.addOption(OPTION_JOB_ID); + options.addOption(OPTION_DEST); + options.addOption(OPTION_INCLUDE_CUBE); + options.addOption(OPTION_INCLUDE_YARN_LOGS); + + kylinConfig = KylinConfig.getInstanceFromEnv(); + executableDao = ExecutableDao.getInstance(kylinConfig); + executableManager = ExecutableManager.getInstance(kylinConfig); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + String jobId = optionsHelper.getOptionValue(OPTION_JOB_ID); + String dest = optionsHelper.getOptionValue(OPTION_DEST); + boolean includeCube = optionsHelper.hasOption(OPTION_INCLUDE_CUBE) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CUBE)) : true; + boolean includeYarnLogs = optionsHelper.hasOption(OPTION_INCLUDE_YARN_LOGS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_YARN_LOGS)) : true; + + if (StringUtils.isEmpty(dest)) { + throw new RuntimeException("destDir is not set, exit directly without extracting"); + } + + if (!dest.endsWith("/")) { + dest = dest + "/"; + } + + ExecutablePO executablePO = executableDao.getJob(jobId); + addRequired(ExecutableDao.pathOfJob(jobId)); + addRequired(ExecutableDao.pathOfJobOutput(jobId)); + for (ExecutablePO task : executablePO.getTasks()) { + addRequired(ExecutableDao.pathOfJob(task.getUuid())); + addRequired(ExecutableDao.pathOfJobOutput(task.getUuid())); + } + executeExtraction(dest); + + if (includeCube) { + String cubeName = CubingExecutableUtil.getCubeName(executablePO.getParams()); + String[] cubeMetaArgs = { "-cube", cubeName, "-destDir", dest + "cube_" + cubeName + "/", "-includeJobs", "false" }; + logger.info("Start to extract related cube: " + StringUtils.join(cubeMetaArgs)); + cubeMetaExtractor.execute(cubeMetaArgs); + } + + if (includeYarnLogs) { + String[] yarnLogsArgs = { "-jobId", jobId, "-destDir", dest + "yarn_" + jobId + "/" }; + logger.info("Start to related yarn job logs: " + StringUtils.join(yarnLogsArgs)); + yarnLogExtractor.execute(yarnLogsArgs); + } + + logger.info("Extracted kylin jobs located at: " + new File(dest).getAbsolutePath()); + } + + private void executeExtraction(String dest) { + logger.info("The resource paths going to be extracted:"); + for (String s : requiredResources) { + logger.info(s + "(required)"); + } + + try { + ResourceStore src = ResourceStore.getStore(KylinConfig.getInstanceFromEnv()); + ResourceStore dst = ResourceStore.getStore(KylinConfig.createInstanceFromUri(dest)); + + for (String path : requiredResources) { + ResourceTool.copyR(src, dst, path); + } + + } catch (IOException e) { + throw new RuntimeException("IOException", e); + } + } + + private void addRequired(String record) { + logger.info("adding required resource {}", record); + requiredResources.add(record); + } + + public static void main(String args[]) { + JobInfoExtractor extractor = new JobInfoExtractor(); + extractor.execute(args); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8de31563/assembly/src/main/java/org/apache/kylin/admin/YarnLogExtractor.java ---------------------------------------------------------------------- diff --git a/assembly/src/main/java/org/apache/kylin/admin/YarnLogExtractor.java b/assembly/src/main/java/org/apache/kylin/admin/YarnLogExtractor.java new file mode 100644 index 0000000..354a3f9 --- /dev/null +++ b/assembly/src/main/java/org/apache/kylin/admin/YarnLogExtractor.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.admin; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.job.common.ShellExecutable; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.dao.ExecutableDao; +import org.apache.kylin.job.dao.ExecutablePO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * Created by dongli on 3/29/16. + */ +public class YarnLogExtractor extends AbstractApplication { + private static final Logger logger = LoggerFactory.getLogger(YarnLogExtractor.class); + + @SuppressWarnings("static-access") + private static final Option OPTION_JOB_ID = OptionBuilder.withArgName("jobId").hasArg().isRequired(true).withDescription("specify the Job ID to extract information. ").create("jobId"); + + @SuppressWarnings("static-access") + private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(true).withDescription("specify the dest dir to save the related information").create("destDir"); + + private Options options; + + private KylinConfig kylinConfig; + private ExecutableDao executableDao; + + List<String> requiredResources = Lists.newArrayList(); + + public YarnLogExtractor() { + options = new Options(); + options.addOption(OPTION_JOB_ID); + options.addOption(OPTION_DEST); + + kylinConfig = KylinConfig.getInstanceFromEnv(); + executableDao = ExecutableDao.getInstance(kylinConfig); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + String jobId = optionsHelper.getOptionValue(OPTION_JOB_ID); + String dest = optionsHelper.getOptionValue(OPTION_DEST); + + if (StringUtils.isEmpty(dest)) { + throw new RuntimeException("destDir is not set, exit directly without extracting"); + } + + if (!dest.endsWith("/")) { + dest = dest + "/"; + } + + ExecutablePO executablePO = executableDao.getJob(jobId); + for (ExecutablePO task : executablePO.getTasks()) { + addRequired(task.getUuid()); + } + executeExtraction(dest); + + logger.info("Extracted yarn logs located at: " + new File(dest).getAbsolutePath()); + } + + private void extractYarnLog(String taskId, String dest) throws Exception { + final Map<String, String> jobInfo = executableDao.getJobOutput(taskId).getInfo(); + if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) { + String applicationId = jobInfo.get(ExecutableConstants.MR_JOB_ID).replace("job", "application"); + File destFile = new File(dest + applicationId + ".log"); + + ShellExecutable yarnExec = new ShellExecutable(); + yarnExec.setCmd("yarn logs -applicationId " + applicationId + " > " + destFile.getAbsolutePath()); + yarnExec.setName(yarnExec.getCmd()); + + logger.info(yarnExec.getCmd()); + kylinConfig.getCliCommandExecutor().execute(yarnExec.getCmd(), null); + } + } + + private void executeExtraction(String dest) throws Exception { + logger.info("The resource paths going to be extracted:"); + for (String taskId : requiredResources) { + logger.info(taskId + "(required)"); + } + + logger.info("Start to download yarn logs."); + FileUtils.forceMkdir(new File(dest)); + for (String taskId : requiredResources) { + extractYarnLog(taskId, dest); + } + } + + private void addRequired(String record) { + logger.info("adding required resource {}", record); + requiredResources.add(record); + } + + public static void main(String args[]) { + YarnLogExtractor extractor = new YarnLogExtractor(); + extractor.execute(args); + } +}