Repository: incubator-eagle Updated Branches: refs/heads/master 02aab65b7 -> 83bb66661
[EAGLE-539] add failure category and job count by state to mr feeder https://issues.apache.org/jira/browse/EAGLE-539 1. We need to classify the errors that generated by mr jobs so that we can analysis requirements like top-n errors or top-n hosts that generate errors. I implement this by using the error messages that generated by the tasks and extract the rules from them. 2. Another requirement is get job count by job status. When we parse jobs in mr history feeder, we can save job id and job status in zookeeper, and then we can flush them to eagle server. Author: wujinhu <wujinhu...@126.com> Closes #434 from wujinhu/EAGLE-539. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/83bb6666 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/83bb6666 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/83bb6666 Branch: refs/heads/master Commit: 83bb66661b46e8d69497e9459d7cca78264a1300 Parents: 02aab65 Author: wujinhu <wujinhu...@126.com> Authored: Mon Sep 19 18:04:17 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Mon Sep 19 18:04:17 2016 +0800 ---------------------------------------------------------------------- .../mr/historyentity/JobExecutionAPIEntity.java | 13 ++++++ .../metrics/JobCountMetricsGenerator.java | 43 ++++++++++++++------ .../JobExecutionMetricsCreationListener.java | 10 +++++ .../mr/history/parser/JHFEventReaderBase.java | 33 +++++++-------- .../mr/history/parser/TaskFailureListener.java | 1 + .../jpm/mr/running/parser/MRJobParser.java | 3 +- .../org/apache/eagle/jpm/util/Constants.java | 3 +- .../java/org/apache/eagle/jpm/util/Utils.java | 26 ++++++++++++ 8 files changed, 99 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java index f7540d5..0e40099 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java @@ -23,6 +23,8 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; +import java.util.Map; + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa") @ColumnFamily("f") @@ -91,6 +93,8 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity { private int failedReduceAttempts; @Column("ad") private String trackingUrl; + @Column("ae") + private Map<String, Map<String, String>> failedTasks; public String getTrackingUrl() { return trackingUrl; @@ -343,4 +347,13 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity { this.failedReduceAttempts = failedReduceAttempts; valueChanged("failedReduceAttempts"); } + + public Map<String, Map<String, String>> getFailedTasks() { + return failedTasks; + } + + public void setFailedTasks(Map<String, Map<String, String>> failedTasks) { + this.failedTasks = failedTasks; + valueChanged("failedTasks"); + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java index a6f9d56..ac4a33d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java @@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.MRJobTagName; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; @@ -43,12 +44,19 @@ public class JobCountMetricsGenerator { public void flush(String date, int year, int month, int day) throws Exception { List<Pair<String, String>> jobs = JobHistoryZKStateManager.instance().getProcessedJobs(date); final int total = jobs.size(); - int fail = 0; + int succeeded = 0; + int killed = 0; + for (Pair<String, String> job : jobs) { - if (!job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) { - ++fail; + if (job.getRight().equals(EagleJobStatus.KILLED.toString())) { + ++killed; + } + + if (job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) { + ++succeeded; } } + int failed = total - killed - succeeded; final IEagleServiceClient client = new EagleServiceClientImpl( MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, @@ -59,24 +67,33 @@ public class JobCountMetricsGenerator { GregorianCalendar cal = new GregorianCalendar(year, month, day); cal.setTimeZone(timeZone); + + List<GenericMetricEntity> entities = new ArrayList<>(); + entities.add(generateEntity(cal, EagleJobStatus.FAILED.toString(), failed)); + entities.add(generateEntity(cal, EagleJobStatus.KILLED.toString(), killed)); + entities.add(generateEntity(cal, EagleJobStatus.SUCCEEDED.toString(), succeeded)); + + LOG.info("start flushing entities of total number " + entities.size()); + client.create(entities); + LOG.info("finish flushing entities of total number " + entities.size()); + client.getJerseyClient().destroy(); + client.close(); + } + + private GenericMetricEntity generateEntity(GregorianCalendar calendar, String state, int count) { GenericMetricEntity metricEntity = new GenericMetricEntity(); - metricEntity.setTimestamp(cal.getTimeInMillis()); - metricEntity.setPrefix(Constants.JOB_COUNT_PER_DAY); - metricEntity.setValue(new double[] {total, fail}); + metricEntity.setTimestamp(calendar.getTimeInMillis()); + metricEntity.setPrefix(String.format(Constants.HADOOP_HISTORY_TOTAL_METRIC_FORMAT, Constants.JOB_LEVEL, Constants.JOB_COUNT_PER_DAY)); + metricEntity.setValue(new double[] {count}); @SuppressWarnings("serial") Map<String, String> baseTags = new HashMap<String, String>() { { put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site); + put(MRJobTagName.JOB_STATUS.toString(), state); } }; metricEntity.setTags(baseTags); - List<GenericMetricEntity> entities = new ArrayList<>(); - entities.add(metricEntity); - LOG.info("start flushing entities of total number " + entities.size()); - client.create(entities); - LOG.info("finish flushing entities of total number " + entities.size()); - client.getJerseyClient().destroy(); - client.close(); + return metricEntity; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java index ce788a3..3e0256f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java @@ -20,10 +20,12 @@ package org.apache.eagle.jpm.mr.history.metrics; import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.MRJobTagName; import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener; import org.apache.eagle.log.entity.GenericMetricEntity; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +63,14 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation } } } + + //generate Constants.JOB_COUNT_PER_HOUR data + Map<String, String> baseTags = new HashMap<>(tags); + baseTags.put(MRJobTagName.JOB_STATUS.toString(), entity.getCurrentState()); + metrics.add(metricWrapper(timeStamp / 3600000 * 3600000, + Constants.JOB_COUNT_PER_HOUR, + new double[]{1}, + baseTags)); } return metrics; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index d33c26b..c87f4b2 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -18,6 +18,8 @@ package org.apache.eagle.jpm.mr.history.parser; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.metrics.JobCounterMetricsGenerator; @@ -26,6 +28,7 @@ import org.apache.eagle.jpm.mr.historyentity.*; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.JobNameNormalization; import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.jpm.util.Utils; import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.jobhistory.EventType; @@ -79,25 +82,9 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl private JobCounterMetricsGenerator jobCounterMetricsGenerator; - public Constants.JobType fetchJobType(Configuration config) { - if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { - return Constants.JobType.CASCADING; - } - if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { - return Constants.JobType.HIVE; - } - if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { - return Constants.JobType.PIG; - } - if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) { - return Constants.JobType.SCOOBI; - } - return Constants.JobType.NOTAVALIABLE; - } - /** * baseTags stores the basic tag name values which might be used for persisting various entities. - * baseTags includes: cluster, datacenter and jobName + * baseTags includes: site and jobName * baseTags are used for all job/task related entities * * @param baseTags @@ -119,6 +106,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl jobExecutionEntity.setTags(new HashMap<>(baseTags)); jobExecutionEntity.setNumFailedMaps(0); jobExecutionEntity.setNumFailedReduces(0); + jobExecutionEntity.setFailedTasks(new HashMap<>()); taskRunningHosts = new HashMap<>(); @@ -130,7 +118,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl this.configuration = configuration; if (this.configuration != null && this.jobType == null) { - this.setJobType(fetchJobType(this.configuration).toString()); + this.setJobType(Utils.fetchJobType(this.configuration).toString()); } this.sumMapTaskDuration = 0L; this.sumReduceTaskDuration = 0L; @@ -451,6 +439,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } entityCreated(entity); + if (entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()) != null) { + jobExecutionEntity.getFailedTasks().put(taskID, + new HashMap<String, String>() { + { + put(entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()), entity.getError()); + } + } + ); + } taskAttemptStartTime.remove(taskAttemptID); } else { // silently ignore http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java index 1a7a5fc..0c18133 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java @@ -93,6 +93,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { //TODO need optimize, match and then capture the data final String errCategory = classifier.classifyError(e.getError()); tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory); + entity.getTags().put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory); failureTask.setError(e.getError()); failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 5811f72..2e36bc4 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -452,7 +452,7 @@ public class MRJobParser implements Runnable { Utils.closeInputStream(is); } - Set<String> needFetchAttemptTasks = calcFetchCounterAndAttemptTaskId(tasks); + Set<String> needFetchAttemptTasks = new HashSet<>();//calcFetchCounterAndAttemptTaskId(tasks); for (MRTask task : tasks) { if (this.finishedTaskIds.contains(task.getId()) && !needFetchAttemptTasks.contains(task.getId())) { continue; @@ -530,6 +530,7 @@ public class MRJobParser implements Runnable { mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOD_DEF_ID.toString(), value); } } + mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOB_TYPE.toString(), Utils.fetchJobType(config).toString()); mrJobEntityMap.get(jobId).setJobConfig(config); mrJobConfigs.put(jobId, config); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index c9bb387..8c561bd 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -178,7 +178,8 @@ public class Constants { public static final String JOB_LEVEL = "job"; public static final String TASK_LEVEL = "task"; public static final String USER_LEVEL = "user"; - public static final String JOB_COUNT_PER_DAY = "hadoop.job.day.count"; + public static final String JOB_COUNT_PER_DAY = "day.count"; + public static final String JOB_COUNT_PER_HOUR = "hour.count"; public static final String HADOOP_HISTORY_TOTAL_METRIC_FORMAT = "hadoop.%s.history.%s"; public static final String HADOOP_HISTORY_MINUTE_METRIC_FORMAT = "hadoop.%s.history.minute.%s"; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java index d738439..91077df 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.util; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,9 @@ import java.io.InputStream; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; public class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); @@ -86,4 +90,26 @@ public class Utils { return 0L; } + + public static Constants.JobType fetchJobType(Map config) { + if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { + return Constants.JobType.CASCADING; + } + if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { + return Constants.JobType.HIVE; + } + if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { + return Constants.JobType.PIG; + } + if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) { + return Constants.JobType.SCOOBI; + } + return Constants.JobType.NOTAVALIABLE; + } + + public static Constants.JobType fetchJobType(Configuration config) { + Map<String, String> mapConfig = new HashMap<>(); + config.forEach(entry -> mapConfig.put(entry.getKey(), entry.getValue())); + return fetchJobType(mapConfig); + } }