Repository: kylin Updated Branches: refs/heads/yaho-cube-planner 02b67cebf -> 8491905eb
APACHE-KYLIN-2723: refactor JobMetricsFacade Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8491905e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8491905e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8491905e Branch: refs/heads/yaho-cube-planner Commit: 8491905eb89d7cbc5b1e284a5f5ceed9d7321b9e Parents: 02b67ce Author: Zhong <nju_y...@apache.org> Authored: Fri Aug 11 06:27:55 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Fri Aug 11 06:27:55 2017 +0800 ---------------------------------------------------------------------- .../kylin/job/metrics/JobMetricsFacade.java | 88 ++++++++++++++++++-- .../org/apache/kylin/engine/mr/CubingJob.java | 85 +++++++------------ .../kylin/rest/metrics/QueryMetricsFacade.java | 12 +-- 3 files changed, 115 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8491905e/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java index 9484350..9762b81 100644 --- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java +++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java @@ -18,7 +18,10 @@ package org.apache.kylin.job.metrics; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metrics.MetricsManager; import org.apache.kylin.metrics.lib.impl.RecordEvent; +import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; import org.apache.kylin.metrics.property.JobPropertyEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +29,25 @@ import org.slf4j.LoggerFactory; public class JobMetricsFacade { private static final Logger logger = LoggerFactory.getLogger(JobMetricsFacade.class); - public static void setJobWrapper(RecordEvent metricsEvent, String projectName, String cubeName, String jobId, + public static void updateMetrics(JobStatisticsResult jobStats) { + /** + * report job related metrics + */ + RecordEvent metricsEvent; + if (jobStats.throwable == null) { + metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()); + setJobWrapper(metricsEvent, jobStats.projectName, jobStats.cubeName, jobStats.jobId, jobStats.jobType, jobStats.cubingType); + setJobStats(metricsEvent, jobStats.tableSize, jobStats.cubeSize, jobStats.buildDuration, jobStats.waitResourceTime, jobStats.perBytesTimeCost, // + jobStats.dColumnDistinct, jobStats.dDictBuilding, jobStats.dCubingInmem, jobStats.dHfileConvert); + } else { + metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()); + setJobExceptionWrapper(metricsEvent, jobStats.projectName, jobStats.cubeName, jobStats.jobId, jobStats.jobType, jobStats.cubingType, // + jobStats.throwable.getClass()); + } + MetricsManager.getInstance().update(metricsEvent); + } + + private static void setJobWrapper(RecordEvent metricsEvent, String projectName, String cubeName, String jobId, String jobType, String cubingType) { metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName); metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName); @@ -35,26 +56,75 @@ public class JobMetricsFacade { metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType); } - public static void setJobStats(RecordEvent metricsEvent, long tableSize, long cubeSize, long buildDuration, - long waitResourceTime, double perBytesTimeCost) { + private static void setJobStats(RecordEvent metricsEvent, long tableSize, long cubeSize, long buildDuration, + long waitResourceTime, double perBytesTimeCost, long dColumnDistinct, long dDictBuilding, + long dCubingInmem, long dHfileConvert) { metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize); metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize); metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), buildDuration); metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), waitResourceTime); metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), perBytesTimeCost); - } - - public static void setJobStepStats(RecordEvent metricsEvent, long dColumnDistinct, long dDictBuilding, - long dCubingInmem, long dHfileConvert) { metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), dColumnDistinct); metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), dDictBuilding); metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), dCubingInmem); metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), dHfileConvert); } - public static <T extends Throwable> void setJobExceptionWrapper(RecordEvent metricsEvent, String projectName, - String cubeName, String jobId, String jobType, String cubingType, Class<T> throwableClass) { + private static <T extends Throwable> void setJobExceptionWrapper(RecordEvent metricsEvent, String projectName, + String cubeName, String jobId, String jobType, String cubingType, Class<T> throwableClass) { setJobWrapper(metricsEvent, projectName, cubeName, jobId, jobType, cubingType); metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), throwableClass.getName()); } + + public static class JobStatisticsResult { + // dimensions + private String projectName; + private String cubeName; + private String jobId; + private String jobType; + private String cubingType; + + // statistics + private long tableSize; + private long cubeSize; + private long buildDuration; + private long waitResourceTime; + private double perBytesTimeCost; + + // step statistics + private long dColumnDistinct = 0L; + private long dDictBuilding = 0L; + private long dCubingInmem = 0L; + private long dHfileConvert = 0L; + + // exception + private Throwable throwable; + + public void setWrapper(String projectName, String cubeName, String jobId, String jobType, String cubingType) { + this.projectName = projectName; + this.cubeName = cubeName; + this.jobId = jobId; + this.jobType = jobType; + this.cubingType = cubingType; + } + + public void setJobStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime, double perBytesTimeCost) { + this.tableSize = tableSize; + this.cubeSize = cubeSize; + this.buildDuration = buildDuration; + this.waitResourceTime = waitResourceTime; + this.perBytesTimeCost = perBytesTimeCost; + } + + public void setJobStepStats(long dColumnDistinct, long dDictBuilding, long dCubingInmem, long dHfileConvert) { + this.dColumnDistinct = dColumnDistinct; + this.dDictBuilding = dDictBuilding; + this.dCubingInmem = dCubingInmem; + this.dHfileConvert = dHfileConvert; + } + + public void setJobException(Throwable throwable) { + this.throwable = throwable; + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/8491905e/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index 79f40ae..9d493aa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -18,18 +18,7 @@ package org.apache.kylin.engine.mr; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.TimeZone; -import java.util.regex.Matcher; - +import com.google.common.base.Strings; import org.apache.commons.lang3.tuple.Pair; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.StringUtil; @@ -49,13 +38,20 @@ import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.metrics.JobMetricsFacade; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; -import org.apache.kylin.metrics.MetricsManager; -import org.apache.kylin.metrics.lib.impl.RecordEvent; -import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Strings; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; +import java.util.regex.Matcher; /** */ @@ -245,50 +241,29 @@ public class CubingJob extends DefaultChainedExecutable { protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) { super.onStatusChange(context, result, state); - /** - * report job related metrics - */ + updateMetrics(context, result, state); + } + + protected void updateMetrics(ExecutableContext context, ExecuteResult result, ExecutableState state) { + JobMetricsFacade.JobStatisticsResult jobStats = new JobMetricsFacade.JobStatisticsResult(); + jobStats.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()), + CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(), + getAlgorithm() == null ? "NULL" : getAlgorithm().toString()); + if (state == ExecutableState.SUCCEED) { - RecordEvent metricsEvent = new TimedRecordEvent( - KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()); - JobMetricsFacade.setJobWrapper(metricsEvent, // - ProjectInstance.getNormalizedProjectName(getProjectName()), - CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(), - getAlgorithm() == null ? "NULL" : getAlgorithm().toString()); - - long tableSize = findSourceSizeBytes(); - long buildDuration = getDuration(); - long waitResourceTime = getMapReduceWaitTime(); - JobMetricsFacade.setJobStats(metricsEvent, // - tableSize, findCubeSizeBytes(), buildDuration, waitResourceTime, - getPerBytesTimeCost(tableSize, buildDuration - waitResourceTime)); - long dColumnDistinct = 0L; - long dDictBuilding = 0L; - long dCubingInmem = 0L; - long dHfileConvert = 0L; + jobStats.setJobStats(findSourceSizeBytes(), findCubeSizeBytes(), getDuration(), getMapReduceWaitTime(), + getPerBytesTimeCost(findSourceSizeBytes(), getDuration() - getMapReduceWaitTime())); if (CubingJobTypeEnum.getByName(getJobType()) == CubingJobTypeEnum.BUILD) { - dColumnDistinct = getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(); - dDictBuilding = getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(); - dCubingInmem = getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(); - dHfileConvert = getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration(); + jobStats.setJobStepStats( + getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(), + getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(), + getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(), + getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration()); } - JobMetricsFacade.setJobStepStats(metricsEvent, // - dColumnDistinct, dDictBuilding, dCubingInmem, dHfileConvert); - - MetricsManager.getInstance().update(metricsEvent); } else if (state == ExecutableState.ERROR) { - RecordEvent metricsEvent = new TimedRecordEvent( - KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()); - - Class throwable = result.getThrowable() != null ? result.getThrowable().getClass() : Exception.class; - JobMetricsFacade.setJobExceptionWrapper(metricsEvent, // - ProjectInstance.getNormalizedProjectName(getProjectName()), - CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(), - getAlgorithm() == null ? "NULL" : getAlgorithm().toString(), throwable); - - MetricsManager.getInstance().update(metricsEvent); + jobStats.setJobException(result.getThrowable() != null ? result.getThrowable() : new Exception()); } - + JobMetricsFacade.updateMetrics(jobStats); } private static double getPerBytesTimeCost(long size, long timeCost) { http://git-wip-us.apache.org/repos/asf/kylin/blob/8491905e/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java index 67ef203..938488a 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java @@ -136,7 +136,7 @@ public class QueryMetricsFacade { } } - public static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName, + private static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName, String rpcServer, Throwable throwable) { metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName); metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), realizationName); @@ -145,7 +145,7 @@ public class QueryMetricsFacade { throwable == null ? "NULL" : throwable.getClass().getName()); } - public static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount, + private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount) { metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), callTimeMs); metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter @@ -155,7 +155,7 @@ public class QueryMetricsFacade { metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor } - public static void setCubeWrapper(RecordEvent metricsEvent, String projectName, String cubeName, String segmentName, + private static void setCubeWrapper(RecordEvent metricsEvent, String projectName, String cubeName, String segmentName, long sourceCuboidId, long targetCuboidId, long filterMask) { metricsEvent.put(QueryCubePropertyEnum.PROJECT.toString(), projectName); metricsEvent.put(QueryCubePropertyEnum.CUBE.toString(), cubeName); @@ -166,7 +166,7 @@ public class QueryMetricsFacade { metricsEvent.put(QueryCubePropertyEnum.FILTER_MASK.toString(), filterMask); } - public static void setCubeStats(RecordEvent metricsEvent, long callCount, long callTimeSum, long callTimeMax, + private static void setCubeStats(RecordEvent metricsEvent, long callCount, long callTimeSum, long callTimeMax, long skipCount, long scanCount, long returnCount, long aggrCount, boolean ifSuccess, double weightPerHit) { metricsEvent.put(QueryCubePropertyEnum.CALL_COUNT.toString(), callCount); metricsEvent.put(QueryCubePropertyEnum.TIME_SUM.toString(), callTimeSum); @@ -180,7 +180,7 @@ public class QueryMetricsFacade { metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), weightPerHit); } - public static void setQueryWrapper(RecordEvent metricsEvent, long queryHashCode, String queryType, + private static void setQueryWrapper(RecordEvent metricsEvent, long queryHashCode, String queryType, String projectName, String realizationName, int realizationType, Throwable throwable) { metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), queryHashCode); metricsEvent.put(QueryPropertyEnum.TYPE.toString(), queryType); @@ -191,7 +191,7 @@ public class QueryMetricsFacade { throwable == null ? "NULL" : throwable.getClass().getName()); } - public static void setQueryStats(RecordEvent metricsEvent, long callTimeMs, long returnCountByCalcite, + private static void setQueryStats(RecordEvent metricsEvent, long callTimeMs, long returnCountByCalcite, long returnCountByStorage) { metricsEvent.put(QueryPropertyEnum.TIME_COST.toString(), callTimeMs); metricsEvent.put(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), returnCountByCalcite);