Repository: incubator-eagle
Updated Branches:
  refs/heads/master 02aab65b7 -> 83bb66661


[EAGLE-539] add failure category and job count by state to mr feeder

https://issues.apache.org/jira/browse/EAGLE-539

1. We need to classify the errors that generated by mr jobs so that we can 
analysis requirements like top-n errors or top-n hosts that generate errors. I 
implement this by using the error messages that generated by the tasks and 
extract the rules from them.
2. Another requirement is get job count by job status. When we parse jobs in mr 
history feeder, we can save job id and job status in zookeeper, and then we can 
flush them to eagle server.

Author: wujinhu <wujinhu...@126.com>

Closes #434 from wujinhu/EAGLE-539.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/83bb6666
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/83bb6666
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/83bb6666

Branch: refs/heads/master
Commit: 83bb66661b46e8d69497e9459d7cca78264a1300
Parents: 02aab65
Author: wujinhu <wujinhu...@126.com>
Authored: Mon Sep 19 18:04:17 2016 +0800
Committer: wujinhu <wujinhu...@126.com>
Committed: Mon Sep 19 18:04:17 2016 +0800

----------------------------------------------------------------------
 .../mr/historyentity/JobExecutionAPIEntity.java | 13 ++++++
 .../metrics/JobCountMetricsGenerator.java       | 43 ++++++++++++++------
 .../JobExecutionMetricsCreationListener.java    | 10 +++++
 .../mr/history/parser/JHFEventReaderBase.java   | 33 +++++++--------
 .../mr/history/parser/TaskFailureListener.java  |  1 +
 .../jpm/mr/running/parser/MRJobParser.java      |  3 +-
 .../org/apache/eagle/jpm/util/Constants.java    |  3 +-
 .../java/org/apache/eagle/jpm/util/Utils.java   | 26 ++++++++++++
 8 files changed, 99 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
index f7540d5..0e40099 100644
--- 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
@@ -23,6 +23,8 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
+import java.util.Map;
+
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("eaglejpa")
 @ColumnFamily("f")
@@ -91,6 +93,8 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
     private int failedReduceAttempts;
     @Column("ad")
     private String trackingUrl;
+    @Column("ae")
+    private Map<String, Map<String, String>> failedTasks;
 
     public String getTrackingUrl() {
         return trackingUrl;
@@ -343,4 +347,13 @@ public class JobExecutionAPIEntity extends 
JobBaseAPIEntity {
         this.failedReduceAttempts = failedReduceAttempts;
         valueChanged("failedReduceAttempts");
     }
+
+    public Map<String, Map<String, String>> getFailedTasks() {
+        return failedTasks;
+    }
+
+    public void setFailedTasks(Map<String, Map<String, String>> failedTasks) {
+        this.failedTasks = failedTasks;
+        valueChanged("failedTasks");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
index a6f9d56..ac4a33d 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
@@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -43,12 +44,19 @@ public class JobCountMetricsGenerator {
     public void flush(String date, int year, int month, int day) throws 
Exception {
         List<Pair<String, String>> jobs = 
JobHistoryZKStateManager.instance().getProcessedJobs(date);
         final int total = jobs.size();
-        int fail = 0;
+        int succeeded = 0;
+        int killed = 0;
+
         for (Pair<String, String> job : jobs) {
-            if (!job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) {
-                ++fail;
+            if (job.getRight().equals(EagleJobStatus.KILLED.toString())) {
+                ++killed;
+            }
+
+            if (job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) {
+                ++succeeded;
             }
         }
+        int failed = total - killed - succeeded;
 
         final IEagleServiceClient client = new EagleServiceClientImpl(
             MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
@@ -59,24 +67,33 @@ public class JobCountMetricsGenerator {
 
         GregorianCalendar cal = new GregorianCalendar(year, month, day);
         cal.setTimeZone(timeZone);
+
+        List<GenericMetricEntity> entities = new ArrayList<>();
+        entities.add(generateEntity(cal, EagleJobStatus.FAILED.toString(), 
failed));
+        entities.add(generateEntity(cal, EagleJobStatus.KILLED.toString(), 
killed));
+        entities.add(generateEntity(cal, EagleJobStatus.SUCCEEDED.toString(), 
succeeded));
+
+        LOG.info("start flushing entities of total number " + entities.size());
+        client.create(entities);
+        LOG.info("finish flushing entities of total number " + 
entities.size());
+        client.getJerseyClient().destroy();
+        client.close();
+    }
+
+    private GenericMetricEntity generateEntity(GregorianCalendar calendar, 
String state, int count) {
         GenericMetricEntity metricEntity = new GenericMetricEntity();
-        metricEntity.setTimestamp(cal.getTimeInMillis());
-        metricEntity.setPrefix(Constants.JOB_COUNT_PER_DAY);
-        metricEntity.setValue(new double[] {total, fail});
+        metricEntity.setTimestamp(calendar.getTimeInMillis());
+        
metricEntity.setPrefix(String.format(Constants.HADOOP_HISTORY_TOTAL_METRIC_FORMAT,
 Constants.JOB_LEVEL, Constants.JOB_COUNT_PER_DAY));
+        metricEntity.setValue(new double[] {count});
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
                 put("site", 
MRHistoryJobConfig.get().getJobExtractorConfig().site);
+                put(MRJobTagName.JOB_STATUS.toString(), state);
             }
         };
         metricEntity.setTags(baseTags);
-        List<GenericMetricEntity> entities = new ArrayList<>();
-        entities.add(metricEntity);
 
-        LOG.info("start flushing entities of total number " + entities.size());
-        client.create(entities);
-        LOG.info("finish flushing entities of total number " + 
entities.size());
-        client.getJerseyClient().destroy();
-        client.close();
+        return metricEntity;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
index ce788a3..3e0256f 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
@@ -20,10 +20,12 @@ package org.apache.eagle.jpm.mr.history.metrics;
 
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -61,6 +63,14 @@ public class JobExecutionMetricsCreationListener extends 
AbstractMetricsCreation
                     }
                 }
             }
+
+            //generate Constants.JOB_COUNT_PER_HOUR data
+            Map<String, String> baseTags = new HashMap<>(tags);
+            baseTags.put(MRJobTagName.JOB_STATUS.toString(), 
entity.getCurrentState());
+            metrics.add(metricWrapper(timeStamp / 3600000 * 3600000,
+                Constants.JOB_COUNT_PER_HOUR,
+                new double[]{1},
+                baseTags));
         }
         return metrics;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index d33c26b..c87f4b2 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.metrics.JobCounterMetricsGenerator;
@@ -26,6 +28,7 @@ import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.JobNameNormalization;
 import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.Utils;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
@@ -79,25 +82,9 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
 
     private JobCounterMetricsGenerator jobCounterMetricsGenerator;
 
-    public Constants.JobType fetchJobType(Configuration config) {
-        if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
-            return Constants.JobType.CASCADING;
-        }
-        if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) {
-            return Constants.JobType.HIVE;
-        }
-        if (config.get(Constants.JobConfiguration.PIG_JOB) != null) {
-            return Constants.JobType.PIG;
-        }
-        if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {
-            return Constants.JobType.SCOOBI;
-        }
-        return Constants.JobType.NOTAVALIABLE;
-    }
-
     /**
      * baseTags stores the basic tag name values which might be used for 
persisting various entities.
-     * baseTags includes: cluster, datacenter and jobName
+     * baseTags includes: site and jobName
      * baseTags are used for all job/task related entities
      *
      * @param baseTags
@@ -119,6 +106,7 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
         jobExecutionEntity.setTags(new HashMap<>(baseTags));
         jobExecutionEntity.setNumFailedMaps(0);
         jobExecutionEntity.setNumFailedReduces(0);
+        jobExecutionEntity.setFailedTasks(new HashMap<>());
 
         taskRunningHosts = new HashMap<>();
 
@@ -130,7 +118,7 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
         this.configuration = configuration;
 
         if (this.configuration != null && this.jobType == null) {
-            this.setJobType(fetchJobType(this.configuration).toString());
+            this.setJobType(Utils.fetchJobType(this.configuration).toString());
         }
         this.sumMapTaskDuration = 0L;
         this.sumReduceTaskDuration = 0L;
@@ -451,6 +439,15 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
             }
 
             entityCreated(entity);
+            if (entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()) 
!= null) {
+                jobExecutionEntity.getFailedTasks().put(taskID,
+                    new HashMap<String, String>() {
+                        {
+                            
put(entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()), 
entity.getError());
+                        }
+                    }
+                );
+            }
             taskAttemptStartTime.remove(taskAttemptID);
         } else {
             // silently ignore

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 1a7a5fc..0c18133 100755
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -93,6 +93,7 @@ public class TaskFailureListener implements 
HistoryJobEntityCreationListener {
         //TODO need optimize, match and then capture the data
         final String errCategory = classifier.classifyError(e.getError());
         tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
+        entity.getTags().put(MRJobTagName.ERROR_CATEGORY.toString(), 
errCategory);
 
         failureTask.setError(e.getError());
         failureTask.setFailureCount(1); // hard coded to 1 unless we do 
pre-aggregation in the future

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 5811f72..2e36bc4 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -452,7 +452,7 @@ public class MRJobParser implements Runnable {
             Utils.closeInputStream(is);
         }
 
-        Set<String> needFetchAttemptTasks = 
calcFetchCounterAndAttemptTaskId(tasks);
+        Set<String> needFetchAttemptTasks = new 
HashSet<>();//calcFetchCounterAndAttemptTaskId(tasks);
         for (MRTask task : tasks) {
             if (this.finishedTaskIds.contains(task.getId()) && 
!needFetchAttemptTasks.contains(task.getId())) {
                 continue;
@@ -530,6 +530,7 @@ public class MRJobParser implements Runnable {
                     
mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOD_DEF_ID.toString(), 
value);
                 }
             }
+            
mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOB_TYPE.toString(), 
Utils.fetchJobType(config).toString());
             mrJobEntityMap.get(jobId).setJobConfig(config);
             mrJobConfigs.put(jobId, config);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index c9bb387..8c561bd 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -178,7 +178,8 @@ public class Constants {
     public static final String JOB_LEVEL = "job";
     public static final String TASK_LEVEL = "task";
     public static final String USER_LEVEL = "user";
-    public static final String JOB_COUNT_PER_DAY = "hadoop.job.day.count";
+    public static final String JOB_COUNT_PER_DAY = "day.count";
+    public static final String JOB_COUNT_PER_HOUR = "hour.count";
 
     public static final String HADOOP_HISTORY_TOTAL_METRIC_FORMAT = 
"hadoop.%s.history.%s";
     public static final String HADOOP_HISTORY_MINUTE_METRIC_FORMAT = 
"hadoop.%s.history.minute.%s";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index d738439..91077df 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.util;
 
+import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,6 +26,9 @@ import java.io.InputStream;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 public class Utils {
     private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
@@ -86,4 +90,26 @@ public class Utils {
 
         return 0L;
     }
+    
+    public static Constants.JobType fetchJobType(Map config) {
+        if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
+            return Constants.JobType.CASCADING;
+        }
+        if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) {
+            return Constants.JobType.HIVE;
+        }
+        if (config.get(Constants.JobConfiguration.PIG_JOB) != null) {
+            return Constants.JobType.PIG;
+        }
+        if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {
+            return Constants.JobType.SCOOBI;
+        }
+        return Constants.JobType.NOTAVALIABLE;
+    }
+
+    public static Constants.JobType fetchJobType(Configuration config) {
+        Map<String, String> mapConfig = new HashMap<>();
+        config.forEach(entry -> mapConfig.put(entry.getKey(), 
entry.getValue()));
+        return fetchJobType(mapConfig);
+    }
 }

Reply via email to