Repository: kylin
Updated Branches:
  refs/heads/yaho-cube-planner 02b67cebf -> 8491905eb


APACHE-KYLIN-2723: refactor JobMetricsFacade


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8491905e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8491905e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8491905e

Branch: refs/heads/yaho-cube-planner
Commit: 8491905eb89d7cbc5b1e284a5f5ceed9d7321b9e
Parents: 02b67ce
Author: Zhong <nju_y...@apache.org>
Authored: Fri Aug 11 06:27:55 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Fri Aug 11 06:27:55 2017 +0800

----------------------------------------------------------------------
 .../kylin/job/metrics/JobMetricsFacade.java     | 88 ++++++++++++++++++--
 .../org/apache/kylin/engine/mr/CubingJob.java   | 85 +++++++------------
 .../kylin/rest/metrics/QueryMetricsFacade.java  | 12 +--
 3 files changed, 115 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8491905e/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java 
b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
index 9484350..9762b81 100644
--- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
+++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
@@ -18,7 +18,10 @@
 
 package org.apache.kylin.job.metrics;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,7 +29,25 @@ import org.slf4j.LoggerFactory;
 public class JobMetricsFacade {
     private static final Logger logger = 
LoggerFactory.getLogger(JobMetricsFacade.class);
 
-    public static void setJobWrapper(RecordEvent metricsEvent, String 
projectName, String cubeName, String jobId,
+    public static void updateMetrics(JobStatisticsResult jobStats) {
+        /**
+         * report job related metrics
+         */
+        RecordEvent metricsEvent;
+        if (jobStats.throwable == null) {
+            metricsEvent = new 
TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob());
+            setJobWrapper(metricsEvent, jobStats.projectName, 
jobStats.cubeName, jobStats.jobId, jobStats.jobType, jobStats.cubingType);
+            setJobStats(metricsEvent, jobStats.tableSize, jobStats.cubeSize, 
jobStats.buildDuration, jobStats.waitResourceTime, jobStats.perBytesTimeCost, //
+                    jobStats.dColumnDistinct, jobStats.dDictBuilding, 
jobStats.dCubingInmem, jobStats.dHfileConvert);
+        } else {
+            metricsEvent = new 
TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException());
+            setJobExceptionWrapper(metricsEvent, jobStats.projectName, 
jobStats.cubeName, jobStats.jobId, jobStats.jobType, jobStats.cubingType, //
+                    jobStats.throwable.getClass());
+        }
+        MetricsManager.getInstance().update(metricsEvent);
+    }
+
+    private static void setJobWrapper(RecordEvent metricsEvent, String 
projectName, String cubeName, String jobId,
             String jobType, String cubingType) {
         metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
         metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
@@ -35,26 +56,75 @@ public class JobMetricsFacade {
         metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
     }
 
-    public static void setJobStats(RecordEvent metricsEvent, long tableSize, 
long cubeSize, long buildDuration,
-            long waitResourceTime, double perBytesTimeCost) {
+    private static void setJobStats(RecordEvent metricsEvent, long tableSize, 
long cubeSize, long buildDuration,
+                                    long waitResourceTime, double 
perBytesTimeCost, long dColumnDistinct, long dDictBuilding,
+                                    long dCubingInmem, long dHfileConvert) {
         metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize);
         metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
         metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 
buildDuration);
         metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 
waitResourceTime);
         metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 
perBytesTimeCost);
-    }
-
-    public static void setJobStepStats(RecordEvent metricsEvent, long 
dColumnDistinct, long dDictBuilding,
-            long dCubingInmem, long dHfileConvert) {
         
metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), 
dColumnDistinct);
         metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 
dDictBuilding);
         
metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 
dCubingInmem);
         
metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 
dHfileConvert);
     }
 
-    public static <T extends Throwable> void 
setJobExceptionWrapper(RecordEvent metricsEvent, String projectName,
-            String cubeName, String jobId, String jobType, String cubingType, 
Class<T> throwableClass) {
+    private static <T extends Throwable> void 
setJobExceptionWrapper(RecordEvent metricsEvent, String projectName,
+                                                                     String 
cubeName, String jobId, String jobType, String cubingType, Class<T> 
throwableClass) {
         setJobWrapper(metricsEvent, projectName, cubeName, jobId, jobType, 
cubingType);
         metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), 
throwableClass.getName());
     }
+
+    public static class JobStatisticsResult {
+        // dimensions
+        private String projectName;
+        private String cubeName;
+        private String jobId;
+        private String jobType;
+        private String cubingType;
+
+        // statistics
+        private long tableSize;
+        private long cubeSize;
+        private long buildDuration;
+        private long waitResourceTime;
+        private double perBytesTimeCost;
+
+        // step statistics
+        private long dColumnDistinct = 0L;
+        private long dDictBuilding = 0L;
+        private long dCubingInmem = 0L;
+        private long dHfileConvert = 0L;
+
+        // exception
+        private Throwable throwable;
+
+        public void setWrapper(String projectName, String cubeName, String 
jobId, String jobType, String cubingType) {
+            this.projectName = projectName;
+            this.cubeName = cubeName;
+            this.jobId = jobId;
+            this.jobType = jobType;
+            this.cubingType = cubingType;
+        }
+
+        public void setJobStats(long tableSize, long cubeSize, long 
buildDuration, long waitResourceTime, double perBytesTimeCost) {
+            this.tableSize = tableSize;
+            this.cubeSize = cubeSize;
+            this.buildDuration = buildDuration;
+            this.waitResourceTime = waitResourceTime;
+            this.perBytesTimeCost = perBytesTimeCost;
+        }
+
+        public void setJobStepStats(long dColumnDistinct, long dDictBuilding, 
long dCubingInmem, long dHfileConvert) {
+            this.dColumnDistinct = dColumnDistinct;
+            this.dDictBuilding = dDictBuilding;
+            this.dCubingInmem = dCubingInmem;
+            this.dHfileConvert = dHfileConvert;
+        }
+
+        public void setJobException(Throwable throwable) {
+            this.throwable = throwable;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8491905e/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 79f40ae..9d493aa 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -18,18 +18,7 @@
 
 package org.apache.kylin.engine.mr;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-
+import com.google.common.base.Strings;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.StringUtil;
@@ -49,13 +38,20 @@ import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.metrics.JobMetricsFacade;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metrics.MetricsManager;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Strings;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
 
 /**
  */
@@ -245,50 +241,29 @@ public class CubingJob extends DefaultChainedExecutable {
     protected void onStatusChange(ExecutableContext context, ExecuteResult 
result, ExecutableState state) {
         super.onStatusChange(context, result, state);
 
-        /**
-         * report job related metrics
-         */
+        updateMetrics(context, result, state);
+    }
+
+    protected void updateMetrics(ExecutableContext context, ExecuteResult 
result, ExecutableState state) {
+        JobMetricsFacade.JobStatisticsResult jobStats = new 
JobMetricsFacade.JobStatisticsResult();
+        
jobStats.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
+                CubingExecutableUtil.getCubeName(getParams()), getId(), 
getJobType(),
+                getAlgorithm() == null ? "NULL" : getAlgorithm().toString());
+
         if (state == ExecutableState.SUCCEED) {
-            RecordEvent metricsEvent = new TimedRecordEvent(
-                    
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob());
-            JobMetricsFacade.setJobWrapper(metricsEvent, //
-                    ProjectInstance.getNormalizedProjectName(getProjectName()),
-                    CubingExecutableUtil.getCubeName(getParams()), getId(), 
getJobType(),
-                    getAlgorithm() == null ? "NULL" : 
getAlgorithm().toString());
-
-            long tableSize = findSourceSizeBytes();
-            long buildDuration = getDuration();
-            long waitResourceTime = getMapReduceWaitTime();
-            JobMetricsFacade.setJobStats(metricsEvent, //
-                    tableSize, findCubeSizeBytes(), buildDuration, 
waitResourceTime,
-                    getPerBytesTimeCost(tableSize, buildDuration - 
waitResourceTime));
-            long dColumnDistinct = 0L;
-            long dDictBuilding = 0L;
-            long dCubingInmem = 0L;
-            long dHfileConvert = 0L;
+            jobStats.setJobStats(findSourceSizeBytes(), findCubeSizeBytes(), 
getDuration(), getMapReduceWaitTime(),
+                    getPerBytesTimeCost(findSourceSizeBytes(), getDuration() - 
getMapReduceWaitTime()));
             if (CubingJobTypeEnum.getByName(getJobType()) == 
CubingJobTypeEnum.BUILD) {
-                dColumnDistinct = 
getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration();
-                dDictBuilding = 
getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration();
-                dCubingInmem = 
getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration();
-                dHfileConvert = 
getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration();
+                jobStats.setJobStepStats(
+                        
getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(),
+                        
getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(),
+                        
getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(),
+                        
getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration());
             }
-            JobMetricsFacade.setJobStepStats(metricsEvent, //
-                    dColumnDistinct, dDictBuilding, dCubingInmem, 
dHfileConvert);
-
-            MetricsManager.getInstance().update(metricsEvent);
         } else if (state == ExecutableState.ERROR) {
-            RecordEvent metricsEvent = new TimedRecordEvent(
-                    
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException());
-
-            Class throwable = result.getThrowable() != null ? 
result.getThrowable().getClass() : Exception.class;
-            JobMetricsFacade.setJobExceptionWrapper(metricsEvent, //
-                    ProjectInstance.getNormalizedProjectName(getProjectName()),
-                    CubingExecutableUtil.getCubeName(getParams()), getId(), 
getJobType(),
-                    getAlgorithm() == null ? "NULL" : 
getAlgorithm().toString(), throwable);
-
-            MetricsManager.getInstance().update(metricsEvent);
+            jobStats.setJobException(result.getThrowable() != null ? 
result.getThrowable() : new Exception());
         }
-
+        JobMetricsFacade.updateMetrics(jobStats);
     }
 
     private static double getPerBytesTimeCost(long size, long timeCost) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/8491905e/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
 
b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 67ef203..938488a 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -136,7 +136,7 @@ public class QueryMetricsFacade {
         }
     }
 
-    public static void setRPCWrapper(RecordEvent metricsEvent, String 
projectName, String realizationName,
+    private static void setRPCWrapper(RecordEvent metricsEvent, String 
projectName, String realizationName,
             String rpcServer, Throwable throwable) {
         metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName);
         metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), 
realizationName);
@@ -145,7 +145,7 @@ public class QueryMetricsFacade {
                 throwable == null ? "NULL" : throwable.getClass().getName());
     }
 
-    public static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, 
long skipCount, long scanCount,
+    private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, 
long skipCount, long scanCount,
             long returnCount, long aggrCount) {
         metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), 
callTimeMs);
         metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), 
skipCount); //Number of skips on region servers based on region meta or fuzzy 
filter
@@ -155,7 +155,7 @@ public class QueryMetricsFacade {
         metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), 
aggrCount); //Count aggregated by coprocessor
     }
 
-    public static void setCubeWrapper(RecordEvent metricsEvent, String 
projectName, String cubeName, String segmentName,
+    private static void setCubeWrapper(RecordEvent metricsEvent, String 
projectName, String cubeName, String segmentName,
             long sourceCuboidId, long targetCuboidId, long filterMask) {
         metricsEvent.put(QueryCubePropertyEnum.PROJECT.toString(), 
projectName);
         metricsEvent.put(QueryCubePropertyEnum.CUBE.toString(), cubeName);
@@ -166,7 +166,7 @@ public class QueryMetricsFacade {
         metricsEvent.put(QueryCubePropertyEnum.FILTER_MASK.toString(), 
filterMask);
     }
 
-    public static void setCubeStats(RecordEvent metricsEvent, long callCount, 
long callTimeSum, long callTimeMax,
+    private static void setCubeStats(RecordEvent metricsEvent, long callCount, 
long callTimeSum, long callTimeMax,
             long skipCount, long scanCount, long returnCount, long aggrCount, 
boolean ifSuccess, double weightPerHit) {
         metricsEvent.put(QueryCubePropertyEnum.CALL_COUNT.toString(), 
callCount);
         metricsEvent.put(QueryCubePropertyEnum.TIME_SUM.toString(), 
callTimeSum);
@@ -180,7 +180,7 @@ public class QueryMetricsFacade {
         metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), 
weightPerHit);
     }
 
-    public static void setQueryWrapper(RecordEvent metricsEvent, long 
queryHashCode, String queryType,
+    private static void setQueryWrapper(RecordEvent metricsEvent, long 
queryHashCode, String queryType,
             String projectName, String realizationName, int realizationType, 
Throwable throwable) {
         metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), queryHashCode);
         metricsEvent.put(QueryPropertyEnum.TYPE.toString(), queryType);
@@ -191,7 +191,7 @@ public class QueryMetricsFacade {
                 throwable == null ? "NULL" : throwable.getClass().getName());
     }
 
-    public static void setQueryStats(RecordEvent metricsEvent, long 
callTimeMs, long returnCountByCalcite,
+    private static void setQueryStats(RecordEvent metricsEvent, long 
callTimeMs, long returnCountByCalcite,
             long returnCountByStorage) {
         metricsEvent.put(QueryPropertyEnum.TIME_COST.toString(), callTimeMs);
         metricsEvent.put(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), 
returnCountByCalcite);

Reply via email to