[EAGLE-538] Add Mapreduce task level apis https://issues.apache.org/jira/browse/EAGLE-538
https://issues.apache.org/jira/browse/EAGLE-554 Author: Qingwen Zhao <qingwen...@gmail.com> Author: Zhao, Qingwen <qingwz...@ebay.com> Closes #432 from qingwen220/dataSkew. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/cfd5c7f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/cfd5c7f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/cfd5c7f8 Branch: refs/heads/master Commit: cfd5c7f826a2816bd5bf8db194932d2b3ec9ff4c Parents: 020a5b3 Author: Qingwen Zhao <qingwen...@gmail.com> Authored: Thu Sep 22 15:59:49 2016 +0800 Committer: Zhao, Qingwen <qingwz...@ebay.com> Committed: Thu Sep 22 15:59:49 2016 +0800 ---------------------------------------------------------------------- .../eagle/service/app/TestServiceAppWithZk.java | 8 +- .../client/impl/EagleServiceBaseClient.java | 2 +- .../client/impl/EagleServiceClientImpl.java | 4 + .../eagle/service/jpm/MRJobCountHelper.java | 140 ----------- .../service/jpm/MRJobExecutionResource.java | 133 +++------- .../service/jpm/MRJobTaskCountResponse.java | 12 +- .../service/jpm/MRTaskExecutionResource.java | 241 +++++++++++++++++++ .../service/jpm/MRTaskExecutionResponse.java | 85 +++++++ .../apache/eagle/service/jpm/ResourceUtils.java | 87 +++++++ .../service/jpm/TaskCountByDurationHelper.java | 106 -------- .../eagle/service/jpm/count/MRJobCountImpl.java | 146 +++++++++++ .../service/jpm/count/MRTaskCountImpl.java | 124 ++++++++++ .../service/jpm/suggestion/AbstractGCFunc.java | 104 ++++++++ .../jpm/suggestion/AbstractInputFunc.java | 86 +++++++ .../eagle/service/jpm/suggestion/MapGCFunc.java | 38 +++ .../service/jpm/suggestion/MapInputFunc.java | 40 +++ .../service/jpm/suggestion/MapSpillFunc.java | 90 +++++++ .../service/jpm/suggestion/ReduceGCFunc.java | 38 +++ .../service/jpm/suggestion/ReduceInputFunc.java | 39 +++ .../service/jpm/suggestion/SuggestionFunc.java | 27 +++ .../jpm/TestJobCountPerBucketHelper.java | 90 ------- .../service/jpm/TestTaskCountPerJobHelper.java | 96 -------- .../service/jpm/count/TestMRJobCountImpl.java | 92 +++++++ .../service/jpm/count/TestMRTaskCountImpl.java | 129 ++++++++++ .../jpm/suggestion/TestDataSkewFunc.java | 89 +++++++ .../jpm/suggestion/TestTaskCounterFunc.java | 88 +++++++ .../spark/history/SparkHistoryJobAppConfig.java | 8 +- .../history/crawl/JHFSparkEventReader.java | 1 + .../jpm/spark/history/crawl/JHFSparkParser.java | 7 +- .../status/JobHistoryZKStateManager.java | 8 +- .../history/storm/SparkHistoryJobParseBolt.java | 4 +- .../history/storm/SparkHistoryJobSpout.java | 17 +- ...spark.history.SparkHistoryJobAppProvider.xml | 6 - .../src/main/resources/application.conf | 3 +- .../org/apache/eagle/jpm/util/Constants.java | 4 + .../eagle/jpm/util/jobcounter/JobCounters.java | 78 ++++++ eagle-server/pom.xml | 11 +- 37 files changed, 1701 insertions(+), 580 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java index a19889b..f7f6c57 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java @@ -31,18 +31,12 @@ import org.apache.eagle.alert.service.IMetadataServiceClient; import org.apache.eagle.alert.service.MetadataServiceClientImpl; import org.apache.eagle.alert.utils.ZookeeperEmbedded; import org.apache.eagle.service.app.ServiceApp; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import com.google.common.base.Joiner; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -/** - * @author xiancli - */ public class TestServiceAppWithZk { ZookeeperEmbedded zkEmbed; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java index ea5d9a5..3b717d8 100644 --- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java +++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java @@ -67,7 +67,7 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient { private final static Logger LOG = LoggerFactory.getLogger(EagleServiceBaseClient.class); - protected static final String DEFAULT_BASE_PATH = "/rest"; + public static final String DEFAULT_BASE_PATH = "/rest"; protected static final MediaType DEFAULT_MEDIA_TYPE = MediaType.APPLICATION_JSON_TYPE; protected static final String DEFAULT_HTTP_HEADER_CONTENT_TYPE = "application/json"; protected static final String CONTENT_TYPE = "Content-Type"; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java index 6018469..912f1f7 100644 --- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java +++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java @@ -45,6 +45,10 @@ public class EagleServiceClientImpl extends EagleServiceBaseClient { super(host, port, username, password); } + public EagleServiceClientImpl(String host, int port, String basePath, String username, String password){ + super(host, port, basePath, username, password); + } + private String getWholePath(String urlString){ return getBaseEndpoint() + urlString; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java deleted file mode 100644 index 2fa5c04..0000000 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.service.jpm; - -import org.apache.eagle.common.DateTimeUtil; -import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; -import org.apache.eagle.jpm.util.Constants; -import org.apache.eagle.jpm.util.MRJobTagName; -import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse; -import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount; - -import java.text.ParseException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -public class MRJobCountHelper { - - public void initJobCountList(List<UnitJobCount> jobCounts, long startTime, long endTime, long intervalInSecs) { - for (long i = startTime / intervalInSecs; i * intervalInSecs <= endTime; i++) { - jobCounts.add(new UnitJobCount(i * intervalInSecs * DateTimeUtil.ONESECOND)); - } - } - - public String moveTimeForwardOneDay(String startTime) throws ParseException { - long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime); - timeInSecs -= 24L * 60L * 60L; - return DateTimeUtil.secondsToHumanDate(timeInSecs); - } - - public JobCountResponse getRunningJobCount(List<JobExecutionAPIEntity> jobDurations, - List<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningJobs, - long startTimeInSecs, - long endTimeInSecs, - long intervalInSecs) { - List<UnitJobCount> jobCounts = new ArrayList<>(); - Set<String> jobTypes = new HashSet<>(); - initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, intervalInSecs); - for (JobExecutionAPIEntity jobDuration: jobDurations) { - String jobType = jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString()); - jobTypes.add(jobType); - countJob(jobCounts, jobDuration.getStartTime() / 1000, jobDuration.getEndTime() / 1000, intervalInSecs, jobType); - } - for (org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity job : runningJobs) { - if (job.getInternalState() != null && !job.getInternalState().equalsIgnoreCase(Constants.JobState.FINISHED.toString())) { - String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString()); - jobTypes.add(jobType); - countJob(jobCounts, job.getStartTime() / 1000, endTimeInSecs, intervalInSecs, jobType); - } - } - JobCountResponse response = new JobCountResponse(); - response.jobCounts = jobCounts; - response.jobTypes = jobTypes; - return response; - } - - public JobCountResponse getHistoryJobCount(List<JobExecutionAPIEntity> jobDurations, String timeList) { - JobCountResponse response = new JobCountResponse(); - List<UnitJobCount> jobCounts = new ArrayList<>(); - Set<String> jobTypes = new HashSet<>(); - List<Long> times = TaskCountByDurationHelper.parseTimeList(timeList); - for (int i = 0; i < times.size(); i++) { - jobCounts.add(new UnitJobCount(times.get(i))); - } - for (JobExecutionAPIEntity job : jobDurations) { - int jobIndex = TaskCountByDurationHelper.getPosition(times, job.getDurationTime()); - UnitJobCount counter = jobCounts.get(jobIndex); - String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString()); - jobTypes.add(jobType); - countJob(counter, jobType); - } - response.jobCounts = jobCounts; - response.jobTypes = jobTypes; - return response; - } - - public void countJob(UnitJobCount counter, String jobType) { - if (null == jobType) { - jobType = "null"; - } - counter.jobCount++; - if (counter.jobCountByType.containsKey(jobType)) { - counter.jobCountByType.put(jobType, counter.jobCountByType.get(jobType) + 1); - } else { - counter.jobCountByType.put(jobType, 1L); - } - } - - public void countJob(List<UnitJobCount> jobCounts, long jobStartTimeSecs, long jobEndTimeSecs, long intervalInSecs, String jobType) { - long startCountPoint = jobCounts.get(0).timeBucket / DateTimeUtil.ONESECOND; - if (jobEndTimeSecs < startCountPoint) { - return; - } - int startIndex = 0; - if (jobStartTimeSecs > startCountPoint) { - long relativeStartTime = jobStartTimeSecs - startCountPoint; - startIndex = (int) (relativeStartTime / intervalInSecs) + (relativeStartTime % intervalInSecs == 0 ? 0 : 1); - } - long relativeEndTime = jobEndTimeSecs - startCountPoint; - int endIndex = (int) (relativeEndTime / intervalInSecs); - - for (int i = startIndex; i <= endIndex && i < jobCounts.size(); i++) { - countJob(jobCounts.get(i), jobType); - } - } - - public List<String> getSearchTimeDuration(List<JobExecutionAPIEntity> jobEntities) { - List<String> pair = new ArrayList<>(); - long minStartTime = System.currentTimeMillis(); - long maxEndTime = 0; - for (JobExecutionAPIEntity jobEntity : jobEntities) { - if (minStartTime > jobEntity.getStartTime()) { - minStartTime = jobEntity.getStartTime(); - } - if (maxEndTime < jobEntity.getEndTime()) { - maxEndTime = jobEntity.getEndTime(); - } - } - pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(minStartTime)); - pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(maxEndTime)); - return pair; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java index e6041f2..204412b 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java @@ -18,21 +18,18 @@ package org.apache.eagle.service.jpm; -import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID; -import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE; - import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; -import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.MRJobTagName; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.generic.GenericEntityServiceResource; import org.apache.eagle.service.generic.ListQueryResource; import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse; -import org.apache.eagle.service.jpm.MRJobTaskCountResponse.TaskCountPerJobResponse; import org.apache.commons.lang.time.StopWatch; +import org.apache.eagle.service.jpm.count.MRJobCountImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +40,14 @@ import javax.ws.rs.core.MediaType; @Path("mrJobs") public class MRJobExecutionResource { - GenericEntityServiceResource resource = new GenericEntityServiceResource(); + + private static final Logger LOG = LoggerFactory.getLogger(MRJobExecutionResource.class); + public static final String ELAPSEDMS = "elapsedms"; public static final String TOTAL_RESULTS = "totalResults"; - private static final Logger LOG = LoggerFactory.getLogger(MRJobExecutionResource.class); + private final MRJobCountImpl helper = new MRJobCountImpl(); + private GenericEntityServiceResource resource = new GenericEntityServiceResource(); @GET @Produces(MediaType.APPLICATION_JSON) @@ -76,14 +76,15 @@ public class MRJobExecutionResource { if (res.isSuccess() && res.getObj() != null) { for (TaggedLogAPIEntity o : res.getObj()) { finishedJobs.add(o); - jobIds.add(o.getTags().get(JOB_ID.toString())); + jobIds.add(o.getTags().get(MRJobTagName.JOB_ID.toString())); } jobQuery = String.format(query, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME); res = resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, metricName, verbose); if (res.isSuccess() && res.getObj() != null) { for (TaggedLogAPIEntity o : res.getObj()) { - if (!isDuplicate(jobIds, o)) { + String key = o.getTags().get(MRJobTagName.JOB_ID.toString()); + if (!ResourceUtils.isDuplicate(jobIds, key)) { jobs.add(o); } } @@ -105,13 +106,6 @@ public class MRJobExecutionResource { } - private boolean isDuplicate(Set<String> keys, TaggedLogAPIEntity o) { - if (keys.isEmpty()) { - return false; - } - return keys.contains(o.getTags().get(JOB_ID.toString())); - } - private String buildCondition(String jobId, String jobDefId, String site) { String conditionFormat = "@site=\"%s\""; String condition = null; @@ -142,7 +136,7 @@ public class MRJobExecutionResource { List<TaggedLogAPIEntity> jobs = new ArrayList<>(); Set<String> jobIds = new HashSet<>(); String condition = buildCondition(jobId, jobDefId, site); - final int pageSize = Integer.MAX_VALUE; + if (condition == null) { response.setException(new Exception("Search condition is empty")); response.setSuccess(false); @@ -155,18 +149,19 @@ public class MRJobExecutionResource { stopWatch.start(); String queryFormat = "%s[%s]{*}"; String queryString = String.format(queryFormat, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, condition); - GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false); + GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res = ResourceUtils.getQueryResult(queryString, null, null); if (res.isSuccess() && res.getObj() != null) { for (TaggedLogAPIEntity o : res.getObj()) { jobs.add(o); - jobIds.add(o.getTags().get(JOB_ID.toString())); + jobIds.add(o.getTags().get(MRJobTagName.JOB_ID.toString())); } } queryString = String.format(queryFormat, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, condition); - res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false); + res = ResourceUtils.getQueryResult(queryString, null, null); if (res.isSuccess() && res.getObj() != null) { for (TaggedLogAPIEntity o : res.getObj()) { - if (!isDuplicate(jobIds, o)) { + String key = o.getTags().get(MRJobTagName.JOB_ID.toString()); + if (!ResourceUtils.isDuplicate(jobIds, key)) { jobs.add(o); } } @@ -193,67 +188,6 @@ public class MRJobExecutionResource { return response; } - - - - @GET - @Path("{jobId}/taskCountsByDuration") - @Produces(MediaType.APPLICATION_JSON) - public TaskCountPerJobResponse getTaskCountsPerJob(@PathParam("jobId") String jobId, - @QueryParam("site") String site, - @QueryParam("timelineInSecs") String timeList, - @QueryParam("top") long top) { - TaskCountPerJobResponse response = new TaskCountPerJobResponse(); - if (jobId == null || site == null || timeList == null || timeList.isEmpty()) { - response.errMessage = "IllegalArgumentException: jobId == null || site == null || timelineInSecs == null or isEmpty"; - return response; - } - TaskCountByDurationHelper helper = new TaskCountByDurationHelper(); - List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new ArrayList<>(); - List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>(); - - List<Long> times = helper.parseTimeList(timeList); - String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId); - GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> historyRes = - resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); - if (historyRes.isSuccess() && historyRes.getObj() != null && historyRes.getObj().size() > 0) { - helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.HistoryTaskComparator()); - for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : historyRes.getObj()) { - int index = helper.getPosition(times, o.getDuration()); - MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index); - helper.countTask(counter, o.getTags().get(TASK_TYPE.toString())); - counter.entities.add(o); - } - } else { - query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId); - GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> runningRes = - resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); - if (runningRes.isSuccess() && runningRes.getObj() != null) { - helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.RunningTaskComparator()); - for (TaskExecutionAPIEntity o : runningRes.getObj()) { - int index = helper.getPosition(times, o.getDuration()); - if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) { - MRJobTaskCountResponse.UnitTaskCount counter = runningTaskCount.get(index); - helper.countTask(counter, o.getTags().get(TASK_TYPE.toString())); - counter.entities.add(o); - } else if (o.getEndTime() != 0) { - MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index); - helper.countTask(counter, o.getTags().get(TASK_TYPE.toString())); - counter.entities.add(o); - } - } - } - } - if (top > 0) { - helper.getTopTasks(runningTaskCount, top); - response.runningTaskCount = runningTaskCount; - helper.getTopTasks(finishedTaskCount, top); - response.finishedTaskCount = finishedTaskCount; - } - response.topNumber = top; - return response; - } - @GET @Path("runningJobCounts") @Produces(MediaType.APPLICATION_JSON) @@ -262,7 +196,6 @@ public class MRJobExecutionResource { @QueryParam("durationEnd") String endTime, @QueryParam("intervalInSecs") long intervalInSecs) { JobCountResponse response = new JobCountResponse(); - MRJobCountHelper helper = new MRJobCountHelper(); if (site == null || startTime == null || endTime == null) { response.errMessage = "IllegalArgument: site, durationBegin, durationEnd is null"; return response; @@ -271,26 +204,22 @@ public class MRJobExecutionResource { response.errMessage = String.format("IllegalArgument: intervalInSecs=%s is invalid", intervalInSecs); return response; } - long startTimeInMills; String searchStartTime = startTime; String searchEndTime = endTime; try { - startTimeInMills = DateTimeUtil.humanDateToSeconds(startTime) * DateTimeUtil.ONESECOND; searchStartTime = helper.moveTimeForwardOneDay(searchStartTime); } catch (Exception e) { response.errMessage = e.getMessage(); return response; } - String query = String.format("%s[@site=\"%s\" AND @endTime>=%s]{@startTime,@endTime,@jobType}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, startTimeInMills); - GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = - resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + String query = String.format("%s[@site=\"%s\"]{@startTime,@endTime,@jobType,@jobId}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site); + GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = ResourceUtils.getQueryResult(query, searchStartTime, searchEndTime); if (!historyRes.isSuccess() || historyRes.getObj() == null) { response.errMessage = String.format("Catch an exception during fetch history jobs: %s with query=%s", historyRes.getException(), query); return response; } - query = String.format("%s[@site=\"%s\"]{@startTime,@endTime,@jobType}", Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site); - GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningRes = - resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + query = String.format("%s[@site=\"%s\"]{@startTime,@endTime,@jobType,@jobId}", Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site); + GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningRes = ResourceUtils.getQueryResult(query, searchStartTime, searchEndTime); if (!runningRes.isSuccess() || runningRes.getObj() == null) { response.errMessage = String.format("Catch an exception during fetch running jobs: %s with query=%s", runningRes.getException(), query); return response; @@ -332,7 +261,6 @@ public class MRJobExecutionResource { public Object getJobMetrics(String site, String timePoint, String metricName, long intervalmin, int top, Function6<String, String, String, Long, Integer, String, Object> metricQueryFunc) { GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity(); - MRJobCountHelper helper = new MRJobCountHelper(); if (site == null || timePoint == null || metricName == null) { response.setException(new IllegalArgumentException("Error: site, timePoint, metricName may be unset")); response.setSuccess(false); @@ -360,8 +288,7 @@ public class MRJobExecutionResource { } String query = String.format("%s[@site=\"%s\" AND @startTime<=\"%s\" AND @endTime>=\"%s\"]{@startTime,@endTime}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, timePointsInMills, timePointsInMills); - GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = - resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = ResourceUtils.getQueryResult(query, searchStartTime, searchEndTime); if (!historyRes.isSuccess() || historyRes.getObj() == null) { return historyRes; } @@ -372,14 +299,14 @@ public class MRJobExecutionResource { return metricQueryFunc.apply(query, timeDuration.get(0), timeDuration.get(1), intervalmin, top, metricName); } - Function6<String, String, String, Long, Integer, String, Object> queryMetricEntitiesFunc + private Function6<String, String, String, Long, Integer, String, Object> queryMetricEntitiesFunc = (query, startTime, endTime, intervalmin, top, metricName) -> { GenericEntityServiceResource resource = new GenericEntityServiceResource(); return resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, true, intervalmin, top, true, 0, metricName, false); }; - Function6<String, String, String, Long, Integer, String, Object> queryMetricListFunc + private Function6<String, String, String, Long, Integer, String, Object> queryMetricListFunc = (query, startTime, endTime, intervalmin, top, metricName) -> { ListQueryResource resource = new ListQueryResource(); return resource.listQuery(query, startTime, endTime, Integer.MAX_VALUE, null, @@ -395,24 +322,22 @@ public class MRJobExecutionResource { @Path("jobCountsByDuration") @Produces(MediaType.APPLICATION_JSON) public JobCountResponse getJobCountGroupByDuration(@QueryParam("site") String site, - @QueryParam("timelineInSecs") String timeList, - @QueryParam("jobStartTimeBegin") String startTime, - @QueryParam("jobStartTimeEnd") String endTime) { + @QueryParam("timeDistInSecs") String timeList, + @QueryParam("startTime") String startTime, + @QueryParam("endTime") String endTime) { JobCountResponse response = new JobCountResponse(); - MRJobCountHelper helper = new MRJobCountHelper(); if (site == null || startTime == null || endTime == null || timeList == null) { - response.errMessage = "IllegalArgument: site, jobStartTimeBegin, jobStartTimeEnd, or timelineInSecs is null"; + response.errMessage = "IllegalArgument: site, startTime, endTime, or timeDistInSecs is null"; return response; } String query = String.format("%s[@site=\"%s\"]{@durationTime,@jobType}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site); - GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = - resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = ResourceUtils.getQueryResult(query, startTime, endTime); if (!historyRes.isSuccess() || historyRes.getObj() == null) { response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query); return response; } try { - return helper.getHistoryJobCount(historyRes.getObj(), timeList); + return helper.getHistoryJobCountGroupByDuration(historyRes.getObj(), timeList); } catch (Exception e) { response.errMessage = e.getMessage(); return response; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java index 170533c..45ffbd7 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java @@ -29,12 +29,16 @@ public class MRJobTaskCountResponse { public List<UnitTaskCount> finishedTaskCount; } + public static class HistoryTaskCountResponse extends MRJobTaskCountResponse { + public List<UnitTaskCount> taskCount; + } + public static class JobCountResponse extends MRJobTaskCountResponse { public Set<String> jobTypes; public List<UnitJobCount> jobCounts; } - static class UnitTaskCount { + public static class UnitTaskCount { public long timeBucket; public int taskCount; public int mapTaskCount; @@ -42,7 +46,7 @@ public class MRJobTaskCountResponse { public Set entities; public List topEntities; - UnitTaskCount(long timeBucket, Comparator comparator) { + public UnitTaskCount(long timeBucket, Comparator comparator) { this.timeBucket = timeBucket; this.taskCount = 0; this.mapTaskCount = 0; @@ -52,12 +56,12 @@ public class MRJobTaskCountResponse { } } - static class UnitJobCount { + public static class UnitJobCount { public long timeBucket; public long jobCount; public Map<String, Long> jobCountByType; - UnitJobCount(long timeBucket) { + public UnitJobCount(long timeBucket) { this.timeBucket = timeBucket; this.jobCount = 0; this.jobCountByType = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResource.java new file mode 100644 index 0000000..1125387 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResource.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; + +import org.apache.eagle.service.jpm.count.MRTaskCountImpl; +import org.apache.eagle.service.jpm.suggestion.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE; + +@Path("mrTasks") +public class MRTaskExecutionResource { + private static final Logger LOG = LoggerFactory.getLogger(MRTaskExecutionResource.class); + MRTaskCountImpl taskCountImpl = new MRTaskCountImpl(); + + @GET + @Path("taskCountsByDuration") + @Produces(MediaType.APPLICATION_JSON) + public MRJobTaskCountResponse.TaskCountPerJobResponse getTaskCountsGroupByDuration(@QueryParam("site") String site, + @QueryParam("jobId") String jobId, + @QueryParam("jobStartTime") String jobStartTime, + @QueryParam("jobEndTime") String jobEndTime, + @QueryParam("timeDistInSecs") String timeDistInSecs, + @QueryParam("top") long top) { + MRJobTaskCountResponse.TaskCountPerJobResponse response = new MRJobTaskCountResponse.TaskCountPerJobResponse(); + if (jobId == null || site == null || timeDistInSecs == null || timeDistInSecs.isEmpty()) { + response.errMessage = "IllegalArgumentException: jobId == null || site == null || timeDistInSecs == null or isEmpty"; + return response; + } + List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new ArrayList<>(); + List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>(); + + List<Long> times = ResourceUtils.parseDistributionList(timeDistInSecs); + String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId); + GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> historyRes = + ResourceUtils.getQueryResult(query, jobStartTime, jobEndTime); + if (historyRes.isSuccess() && historyRes.getObj() != null && historyRes.getObj().size() > 0) { + taskCountImpl.initTaskCountList(runningTaskCount, finishedTaskCount, times, new MRTaskCountImpl.HistoryTaskComparator()); + for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : historyRes.getObj()) { + int index = ResourceUtils.getDistributionPosition(times, o.getDuration() / DateTimeUtil.ONESECOND); + MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index); + taskCountImpl.countTask(counter, o.getTags().get(TASK_TYPE.toString())); + counter.entities.add(o); + } + } else { + query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId); + GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity> runningRes = + ResourceUtils.getQueryResult(query, jobStartTime, jobEndTime); + if (runningRes.isSuccess() && runningRes.getObj() != null) { + taskCountImpl.initTaskCountList(runningTaskCount, finishedTaskCount, times, new MRTaskCountImpl.RunningTaskComparator()); + for (org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity o : runningRes.getObj()) { + int index = ResourceUtils.getDistributionPosition(times, o.getDuration() / DateTimeUtil.ONESECOND); + if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) { + MRJobTaskCountResponse.UnitTaskCount counter = runningTaskCount.get(index); + taskCountImpl.countTask(counter, o.getTags().get(TASK_TYPE.toString())); + counter.entities.add(o); + } else if (o.getEndTime() != 0) { + MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index); + taskCountImpl.countTask(counter, o.getTags().get(TASK_TYPE.toString())); + counter.entities.add(o); + } + } + } + } + if (top > 0) { + taskCountImpl.getTopTasks(runningTaskCount, top); + response.runningTaskCount = runningTaskCount; + taskCountImpl.getTopTasks(finishedTaskCount, top); + response.finishedTaskCount = finishedTaskCount; + } + response.topNumber = top; + return response; + } + + private MRTaskExecutionResponse.TaskGroupResponse getTaskGroups(@QueryParam("site") String site, + @QueryParam("shortJob_id") String shortDurationJobId, + @QueryParam("longJob_id") String longDurationJobId) { + MRTaskExecutionResponse.TaskGroupResponse result = new MRTaskExecutionResponse.TaskGroupResponse(); + String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, shortDurationJobId); + GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> smallResponse = ResourceUtils.getQueryResult(query, null, null); + if (!smallResponse.isSuccess() || smallResponse.getObj() == null) { + result.errMessage = smallResponse.getException(); + return result; + } + long longestDuration = 0; + for (TaskExecutionAPIEntity entity : smallResponse.getObj()) { + if (entity.getDuration() > longestDuration) { + longestDuration = entity.getDuration(); + } + } + query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, longDurationJobId); + GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> largeResponse = ResourceUtils.getQueryResult(query, null, null); + if (!largeResponse.isSuccess() || largeResponse.getObj() == null) { + result.errMessage = largeResponse.getException(); + return result; + } + result.tasksGroupByType = new HashMap<>(); + result.tasksGroupByType.put(Constants.TaskType.MAP.toString(), new MRTaskExecutionResponse.TaskGroup()); + result.tasksGroupByType.put(Constants.TaskType.REDUCE.toString(), new MRTaskExecutionResponse.TaskGroup()); + groupTasksByValue(result, false, largeResponse.getObj(), longestDuration); + groupTasksByValue(result, true, smallResponse.getObj(), longestDuration); + + return result; + } + + public MRTaskExecutionResponse.TaskGroupResponse groupTasksByValue(MRTaskExecutionResponse.TaskGroupResponse result, boolean keepShort, List<TaskExecutionAPIEntity> tasks, long value) { + for (TaskExecutionAPIEntity entity : tasks) { + String taskType = entity.getTags().get(MRJobTagName.TASK_TYPE.toString()); + MRTaskExecutionResponse.TaskGroup taskGroup = result.tasksGroupByType.get(taskType.toUpperCase()); + if (entity.getDuration() <= value && keepShort) { + taskGroup.shortTasks.add(entity); + } + if (entity.getDuration() > value) { + taskGroup.longTasks.add(entity); + } + } + return result; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("taskSuggestion") + public List<MRTaskExecutionResponse.JobSuggestionResponse> getSuggestion(@QueryParam("site") String site, + @QueryParam("shortJob_id") String shortDurationJobId, + @QueryParam("longJob_id") String longDurationJobId, + @QueryParam("mapInputThreshold") long mapInputThreshold, + @QueryParam("reduceInputThreshold") long reduceInputThreshold, + @QueryParam("mapGcThreshold") long mapGcThreshold, + @QueryParam("reduceGcThreshold") long reduceGcThreshold, + @QueryParam("mapSpillThreshold") long mapSpillThreshold) { + List<MRTaskExecutionResponse.JobSuggestionResponse> result = new ArrayList<>(); + MRTaskExecutionResponse.TaskGroupResponse taskGroups = getTaskGroups(site, shortDurationJobId, longDurationJobId); + if (taskGroups.errMessage != null) { + LOG.error(taskGroups.errMessage); + return result; + } + List<SuggestionFunc> suggestionFuncs = new ArrayList<>(); + suggestionFuncs.add(new MapInputFunc(mapInputThreshold)); + suggestionFuncs.add(new ReduceInputFunc(reduceInputThreshold)); + suggestionFuncs.add(new MapGCFunc(mapGcThreshold)); + suggestionFuncs.add(new ReduceGCFunc(reduceGcThreshold)); + suggestionFuncs.add(new MapSpillFunc(mapSpillThreshold)); + try { + for (SuggestionFunc func : suggestionFuncs) { + result.add(func.apply(taskGroups)); + } + } catch (Exception ex) { + ex.printStackTrace(); + return result; + } + return result; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("historyTaskCount") + public MRJobTaskCountResponse.HistoryTaskCountResponse getTaskCountInMinute(@QueryParam("site") String site, + @QueryParam("jobId") String jobId, + @QueryParam("jobStartTime") String jobStartTime, + @QueryParam("jobEndTime") String jobEndTime) { + MRJobTaskCountResponse.HistoryTaskCountResponse result = new MRJobTaskCountResponse.HistoryTaskCountResponse(); + if (jobId == null || site == null || jobStartTime == null || jobEndTime == null) { + result.errMessage = "IllegalArgumentException: jobId, or site, or jobStartTime, or jobEndTime is null"; + return result; + } + + String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{@startTime,@endTime,@taskType}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId); + GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> response = ResourceUtils.getQueryResult(query, jobStartTime, jobEndTime); + if (!response.isSuccess() || response.getObj() == null) { + result.errMessage = response.getException(); + return result; + } + try { + long startTimeInMin = DateTimeUtil.humanDateToSeconds(jobStartTime) / 60; + long endTimeInMin = DateTimeUtil.humanDateToSeconds(jobEndTime) / 60; + return taskCountImpl.countHistoryTask(response.getObj(), startTimeInMin, endTimeInMin); + } catch (Exception e) { + e.printStackTrace(); + result.errMessage = e.getMessage(); + return result; + } + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("taskDistribution/{counterName}") + public MRTaskExecutionResponse.TaskDistributionResponse getTaskDistributionByCounterName(@QueryParam("site") String site, + @QueryParam("jobId") String jobId, + @QueryParam("jobStartTime") String jobStartTime, + @QueryParam("jobEndTime") String jobEndTime, + @QueryParam("taskType") String taskType, + @PathParam("counterName") String counterName, + @QueryParam("distRange") String distRange) { + MRTaskExecutionResponse.TaskDistributionResponse result = new MRTaskExecutionResponse.TaskDistributionResponse(); + String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\" AND @taskType=\"%s\"]{@jobCounters}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, + site, jobId, Constants.TaskType.MAP.toString()); + GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> response = ResourceUtils.getQueryResult(query, jobStartTime, jobEndTime); + if (!response.isSuccess() || response.getObj() == null) { + result.errMessage = response.getException(); + return result; + } + try { + return taskCountImpl.getHistoryTaskDistribution(response.getObj(), counterName, distRange); + } catch (Exception e) { + e.printStackTrace(); + result.errMessage = e.getMessage(); + return result; + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResponse.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResponse.java new file mode 100644 index 0000000..c7cc258 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResponse.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + + +import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class MRTaskExecutionResponse { + public String errMessage; + + public static class TaskGroupResponse extends MRTaskExecutionResponse { + public Map<String, TaskGroup> tasksGroupByType; + } + + public static class TaskGroup { + public List<TaskExecutionAPIEntity> longTasks; + public List<TaskExecutionAPIEntity> shortTasks; + + public TaskGroup() { + longTasks = new ArrayList<>(); + shortTasks = new ArrayList<>(); + } + } + + public static class JobSuggestionResponse extends MRTaskExecutionResponse { + public String suggestionType; + public List<SuggestionResult> suggestionResults; + } + + public static class SuggestionResult { + public String name; + public double value; + public String suggestion; + + public SuggestionResult(String name, double value, String suggestion) { + this.name = name; + this.value = value; + this.suggestion = suggestion; + } + + public SuggestionResult(String name, double value) { + this.value = value; + this.name = name; + } + } + + public static class TaskDistributionResponse extends MRTaskExecutionResponse { + public String counterName; + public List<CountUnit> taskBuckets; + + public TaskDistributionResponse() { + taskBuckets = new ArrayList<>(); + } + } + + public static class CountUnit { + public long bucket; + public long countVal; + + public CountUnit(long bucket) { + this.bucket = bucket; + this.countVal = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/ResourceUtils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/ResourceUtils.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/ResourceUtils.java new file mode 100644 index 0000000..381c7ec --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/ResourceUtils.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.generic.GenericEntityServiceResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class ResourceUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class); + + static GenericEntityServiceResource resource = new GenericEntityServiceResource(); + + public static GenericServiceAPIResponseEntity getQueryResult(String query, String startTime, String endTime) { + return resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + } + + public static double[] getCounterValues(List<TaskExecutionAPIEntity> tasks, JobCounters.CounterName counterName) { + List<Double> values = new ArrayList<>(); + for (TaskExecutionAPIEntity task : tasks) { + values.add(Double.valueOf(task.getJobCounters().getCounterValue(counterName))); + } + return toArray(values); + } + + public static double[] toArray(List<Double> input) { + double[] result = new double[input.size()]; + for (int i = 0; i < result.length; i++) { + result[i] = input.get(i); + } + return result; + } + + public static List<Long> parseDistributionList(String timelist) { + List<Long> times = new ArrayList<>(); + String [] strs = timelist.split("[,\\s]"); + for (String str : strs) { + try { + times.add(Long.parseLong(str)); + } catch (Exception ex) { + LOG.warn(str + " is not a number"); + } + } + return times; + } + + public static int getDistributionPosition(List<Long> rangeList, Long value) { + for (int i = 1; i < rangeList.size(); i++) { + if (value < rangeList.get(i)) { + return i - 1; + } + } + return rangeList.size() - 1; + } + + public static boolean isDuplicate(Set<String> keys, String jobId) { + if (keys.isEmpty()) { + return false; + } + return keys.contains(jobId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java deleted file mode 100644 index 0eeb440..0000000 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.service.jpm; - -import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; -import org.apache.eagle.jpm.util.Constants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; - -public class TaskCountByDurationHelper { - - private static final Logger LOG = LoggerFactory.getLogger(TaskCountByDurationHelper.class); - - public static List<Long> parseTimeList(String timelist) { - List<Long> times = new ArrayList<>(); - String [] strs = timelist.split("[,\\s]"); - for (String str : strs) { - try { - times.add(Long.parseLong(str)); - } catch (Exception ex) { - LOG.warn(str + " is not a number"); - } - } - return times; - } - - public static int getPosition(List<Long> times, Long duration) { - duration = duration / 1000; - for (int i = 1; i < times.size(); i++) { - if (duration < times.get(i)) { - return i - 1; - } - } - return times.size() - 1; - } - - public void getTopTasks(List<MRJobTaskCountResponse.UnitTaskCount> list, long top) { - for (MRJobTaskCountResponse.UnitTaskCount taskCounter : list) { - Iterator<TaskExecutionAPIEntity> iterator = taskCounter.entities.iterator(); - for (int i = 0; i < top && iterator.hasNext(); i++) { - taskCounter.topEntities.add(iterator.next()); - } - taskCounter.entities.clear(); - } - } - - public void countTask(MRJobTaskCountResponse.UnitTaskCount counter, String taskType) { - counter.taskCount++; - if (taskType.equalsIgnoreCase(Constants.TaskType.MAP.toString())) { - counter.mapTaskCount++; - } else if (taskType.equalsIgnoreCase(Constants.TaskType.REDUCE.toString())) { - counter.reduceTaskCount++; - } - } - - public void initTaskCountList(List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount, - List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount, - List<Long> times, - Comparator comparator) { - for (int i = 0; i < times.size(); i++) { - runningTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator)); - finishedTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator)); - } - } - - static class RunningTaskComparator implements Comparator<TaskExecutionAPIEntity> { - @Override - public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) { - Long time1 = o1.getDuration(); - Long time2 = o2.getDuration(); - return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1); - } - } - - static class HistoryTaskComparator implements Comparator<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> { - @Override - public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1, - org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) { - Long time1 = o1.getDuration(); - Long time2 = o2.getDuration(); - return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRJobCountImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRJobCountImpl.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRJobCountImpl.java new file mode 100644 index 0000000..d8fc004 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRJobCountImpl.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm.count; + +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse; +import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount; +import org.apache.eagle.service.jpm.ResourceUtils; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class MRJobCountImpl { + + public void initJobCountList(List<UnitJobCount> jobCounts, long startTime, long endTime, long intervalInSecs) { + for (long i = startTime / intervalInSecs; i * intervalInSecs <= endTime; i++) { + jobCounts.add(new UnitJobCount(i * intervalInSecs * DateTimeUtil.ONESECOND)); + } + } + + public String moveTimeForwardOneDay(String startTime) throws ParseException { + long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime); + timeInSecs -= 24L * 60L * 60L; + return DateTimeUtil.secondsToHumanDate(timeInSecs); + } + + public JobCountResponse getRunningJobCount(List<JobExecutionAPIEntity> historyJobs, + List<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningJobs, + long startTimeInSecs, + long endTimeInSecs, + long intervalInSecs) { + List<UnitJobCount> jobCounts = new ArrayList<>(); + Set<String> jobTypes = new HashSet<>(); + Set<String> jobIds = new HashSet<>(); + initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, intervalInSecs); + long startTimeInMs = startTimeInSecs * DateTimeUtil.ONESECOND; + for (JobExecutionAPIEntity job: historyJobs) { + jobIds.add(job.getTags().get(MRJobTagName.JOB_ID.toString())); + if (job.getEndTime() >= startTimeInMs) { + String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString()); + jobTypes.add(jobType); + countJob(jobCounts, job.getStartTime() / DateTimeUtil.ONESECOND, job.getEndTime() / DateTimeUtil.ONESECOND, intervalInSecs, jobType); + } + } + for (org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity job : runningJobs) { + if (!ResourceUtils.isDuplicate(jobIds, job.getTags().get(MRJobTagName.JOB_ID.toString()))) { + String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString()); + jobTypes.add(jobType); + countJob(jobCounts, job.getStartTime() / DateTimeUtil.ONESECOND, endTimeInSecs, intervalInSecs, jobType); + } + } + JobCountResponse response = new JobCountResponse(); + response.jobCounts = jobCounts; + response.jobTypes = jobTypes; + return response; + } + + public JobCountResponse getHistoryJobCountGroupByDuration(List<JobExecutionAPIEntity> jobDurations, String timeList) { + JobCountResponse response = new JobCountResponse(); + List<UnitJobCount> jobCounts = new ArrayList<>(); + Set<String> jobTypes = new HashSet<>(); + List<Long> times = ResourceUtils.parseDistributionList(timeList); + for (int i = 0; i < times.size(); i++) { + jobCounts.add(new UnitJobCount(times.get(i))); + } + for (JobExecutionAPIEntity job : jobDurations) { + int jobIndex = ResourceUtils.getDistributionPosition(times, job.getDurationTime() / DateTimeUtil.ONESECOND); + UnitJobCount counter = jobCounts.get(jobIndex); + String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString()); + jobTypes.add(jobType); + countJob(counter, jobType); + } + response.jobCounts = jobCounts; + response.jobTypes = jobTypes; + return response; + } + + public void countJob(UnitJobCount counter, String jobType) { + if (null == jobType) { + jobType = "null"; + } + counter.jobCount++; + if (counter.jobCountByType.containsKey(jobType)) { + counter.jobCountByType.put(jobType, counter.jobCountByType.get(jobType) + 1); + } else { + counter.jobCountByType.put(jobType, 1L); + } + } + + public void countJob(List<UnitJobCount> jobCounts, long jobStartTimeSecs, long jobEndTimeSecs, long intervalInSecs, String jobType) { + long startCountPoint = jobCounts.get(0).timeBucket / DateTimeUtil.ONESECOND; + if (jobEndTimeSecs < startCountPoint) { + return; + } + int startIndex = 0; + if (jobStartTimeSecs > startCountPoint) { + long relativeStartTime = jobStartTimeSecs - startCountPoint; + startIndex = (int) (relativeStartTime / intervalInSecs) + (relativeStartTime % intervalInSecs == 0 ? 0 : 1); + } + long relativeEndTime = jobEndTimeSecs - startCountPoint; + int endIndex = (int) (relativeEndTime / intervalInSecs); + + for (int i = startIndex; i <= endIndex && i < jobCounts.size(); i++) { + countJob(jobCounts.get(i), jobType); + } + } + + public List<String> getSearchTimeDuration(List<JobExecutionAPIEntity> jobEntities) { + List<String> pair = new ArrayList<>(); + long minStartTime = System.currentTimeMillis(); + long maxEndTime = 0; + for (JobExecutionAPIEntity jobEntity : jobEntities) { + if (minStartTime > jobEntity.getStartTime()) { + minStartTime = jobEntity.getStartTime(); + } + if (maxEndTime < jobEntity.getEndTime()) { + maxEndTime = jobEntity.getEndTime(); + } + } + pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(minStartTime)); + pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(maxEndTime)); + return pair; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRTaskCountImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRTaskCountImpl.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRTaskCountImpl.java new file mode 100644 index 0000000..f5017eb --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRTaskCountImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm.count; + +import org.apache.commons.io.FileUtils; +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.apache.eagle.service.jpm.MRJobTaskCountResponse; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse; +import org.apache.eagle.service.jpm.ResourceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class MRTaskCountImpl { + + private static final Logger LOG = LoggerFactory.getLogger(MRTaskCountImpl.class); + + public void getTopTasks(List<MRJobTaskCountResponse.UnitTaskCount> list, long top) { + for (MRJobTaskCountResponse.UnitTaskCount taskCounter : list) { + Iterator<TaskExecutionAPIEntity> iterator = taskCounter.entities.iterator(); + for (int i = 0; i < top && iterator.hasNext(); i++) { + taskCounter.topEntities.add(iterator.next()); + } + taskCounter.entities.clear(); + } + } + + public void countTask(MRJobTaskCountResponse.UnitTaskCount counter, String taskType) { + counter.taskCount++; + if (taskType.equalsIgnoreCase(Constants.TaskType.MAP.toString())) { + counter.mapTaskCount++; + } else if (taskType.equalsIgnoreCase(Constants.TaskType.REDUCE.toString())) { + counter.reduceTaskCount++; + } + } + + public void initTaskCountList(List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount, + List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount, + List<Long> times, + Comparator comparator) { + for (int i = 0; i < times.size(); i++) { + runningTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator)); + finishedTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator)); + } + } + + public static class RunningTaskComparator implements Comparator<TaskExecutionAPIEntity> { + @Override + public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) { + Long time1 = o1.getDuration(); + Long time2 = o2.getDuration(); + return (time1 > time2 ? -1 : (time1.equals(time2)) ? 0 : 1); + } + } + + public static class HistoryTaskComparator implements Comparator<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> { + @Override + public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1, + org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) { + Long time1 = o1.getDuration(); + Long time2 = o2.getDuration(); + return (time1 > time2 ? -1 : (time1.equals(time2)) ? 0 : 1); + } + } + + public MRJobTaskCountResponse.HistoryTaskCountResponse countHistoryTask(List<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> tasks, long startTimeInMin, long endTimeInMin) { + List<MRJobTaskCountResponse.UnitTaskCount> taskCounts = new ArrayList<>(); + for (long i = startTimeInMin; i <= endTimeInMin; i++) { + taskCounts.add(new MRJobTaskCountResponse.UnitTaskCount(i * DateTimeUtil.ONEMINUTE, null)); + } + for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity task : tasks) { + String taskType = task.getTags().get(MRJobTagName.TASK_TYPE.toString()); + long taskStarTimeMin = task.getStartTime() / DateTimeUtil.ONEMINUTE; + long taskEndTimeMin = task.getEndTime() / DateTimeUtil.ONEMINUTE; + int relativeStartTime = (int) (taskStarTimeMin - startTimeInMin); + int relativeEndTime = (int) (taskEndTimeMin - startTimeInMin); + for (int i = relativeStartTime; i <= relativeEndTime; i++) { + countTask(taskCounts.get(i), taskType); + } + } + MRJobTaskCountResponse.HistoryTaskCountResponse response = new MRJobTaskCountResponse.HistoryTaskCountResponse(); + response.taskCount = taskCounts; + return response; + } + + public MRTaskExecutionResponse.TaskDistributionResponse getHistoryTaskDistribution(List<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> tasks, String counterName, String distRange) { + MRTaskExecutionResponse.TaskDistributionResponse response = new MRTaskExecutionResponse.TaskDistributionResponse(); + response.counterName = counterName; + List<Long> distRangeList = ResourceUtils.parseDistributionList(distRange); + for (int i = 0; i < distRangeList.size(); i++) { + response.taskBuckets.add(new MRTaskExecutionResponse.CountUnit(distRangeList.get(i))); + } + JobCounters.CounterName jobCounterName = JobCounters.CounterName.valueOf(counterName.toUpperCase()); + for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity task : tasks) { + Long counterValue = task.getJobCounters().getCounterValue(jobCounterName); + int pos = ResourceUtils.getDistributionPosition(distRangeList, counterValue); + response.taskBuckets.get(pos).countVal++; + } + return response; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractGCFunc.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractGCFunc.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractGCFunc.java new file mode 100644 index 0000000..ad38fcb --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractGCFunc.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm.suggestion; + +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse.TaskGroup; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse.JobSuggestionResponse; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse.TaskGroupResponse; +import org.apache.eagle.service.jpm.ResourceUtils; + +import java.util.ArrayList; +import java.util.List; + +public abstract class AbstractGCFunc implements SuggestionFunc { + + private static final String GC_RATIO_NAME_FORMAT = "gcRatio (%s / %s) deviation"; + private static final String GC_SUGGESTION_FORMAT = "gcRatio deviation exceeds threshold %.2f, where the deviation is %.2f / %.2f"; + + private static final double GC_RATIO_DEVIATION_THRESHOLD = 2; + + private Constants.SuggestionType suggestionType; + private double threshold; + + public AbstractGCFunc(Constants.SuggestionType suggestionType) { + this.suggestionType = suggestionType; + this.threshold = GC_RATIO_DEVIATION_THRESHOLD; + } + + public AbstractGCFunc(Constants.SuggestionType suggestionType, double threshold) { + this.suggestionType = suggestionType; + this.threshold = threshold > 0 ? threshold : GC_RATIO_DEVIATION_THRESHOLD; + } + + protected abstract TaskGroup getTasks(TaskGroupResponse tasks); + + + private double getGcRatio(List<TaskExecutionAPIEntity> tasks) { + if (tasks.isEmpty()) { + return 0; + } + double[] gcMs = ResourceUtils.getCounterValues(tasks, JobCounters.CounterName.GC_MILLISECONDS); + double[] cpuMs = ResourceUtils.getCounterValues(tasks, JobCounters.CounterName.CPU_MILLISECONDS); + + DescriptiveStatistics statistics = new DescriptiveStatistics(); + double averageCpuMs = statistics.getMeanImpl().evaluate(cpuMs); + double averageGcMs = statistics.getMeanImpl().evaluate(gcMs); + if (averageCpuMs == 0) { + averageCpuMs = 1; + } + return averageGcMs / averageCpuMs; + } + + @Override + public JobSuggestionResponse apply(TaskGroupResponse data) { + JobSuggestionResponse response = new JobSuggestionResponse(); + response.suggestionType = suggestionType.name(); + + TaskGroup taskGroup = getTasks(data); + if (taskGroup.longTasks.isEmpty()) { + return response; + } + + double smallerGcRatio = getGcRatio(taskGroup.shortTasks); + double largerGcRatio = getGcRatio(taskGroup.longTasks); + response.suggestionResults = getGCsuggest(smallerGcRatio, largerGcRatio); + return response; + } + + private List<MRTaskExecutionResponse.SuggestionResult> getGCsuggest(double smallerRatio, double largerRatio) { + if (smallerRatio <= 0) { + smallerRatio = 1; + } + double deviation = largerRatio / smallerRatio; + String suggestName = String.format(GC_RATIO_NAME_FORMAT, JobCounters.CounterName.GC_MILLISECONDS.getName(), JobCounters.CounterName.CPU_MILLISECONDS.getName()); + String suggestion = null; + if (deviation > threshold) { + suggestion = String.format(GC_SUGGESTION_FORMAT, threshold, largerRatio, smallerRatio); + } + List<MRTaskExecutionResponse.SuggestionResult> suggestionResults = new ArrayList<>(); + suggestionResults.add(new MRTaskExecutionResponse.SuggestionResult(suggestName, deviation, suggestion)); + return suggestionResults; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractInputFunc.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractInputFunc.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractInputFunc.java new file mode 100644 index 0000000..5a89055 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractInputFunc.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm.suggestion; + +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse.JobSuggestionResponse; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse.TaskGroupResponse; +import org.apache.eagle.service.jpm.ResourceUtils; + +import java.util.ArrayList; +import java.util.List; + +public abstract class AbstractInputFunc implements SuggestionFunc { + + private JobCounters.CounterName counterName; + private Constants.SuggestionType suggestType; + private double threshold; + private static final long DATA_SKEW_THRESHOLD = 2; + + private static final String DEVIATION_SUGGEST_FORMAT = "average %s deviation"; + private static final String DATA_SKEW_SUGGESTION_FORMAT = "%s deviation exceeds threshold %.2f, where the deviation is %.2f / %.2f"; + + public AbstractInputFunc(JobCounters.CounterName counterName, Constants.SuggestionType type) { + this.counterName = counterName; + this.suggestType = type; + this.threshold = DATA_SKEW_THRESHOLD; + } + + public AbstractInputFunc(JobCounters.CounterName counterName, Constants.SuggestionType type, double threshold) { + this.counterName = counterName; + this.suggestType = type; + this.threshold = threshold > 0 ? threshold : DATA_SKEW_THRESHOLD; + } + + protected abstract MRTaskExecutionResponse.TaskGroup getTasks(TaskGroupResponse tasks); + + @Override + public JobSuggestionResponse apply(TaskGroupResponse data) { + MRTaskExecutionResponse.TaskGroup taskGroup = getTasks(data); + double[] smallerGroup = ResourceUtils.getCounterValues(taskGroup.shortTasks, counterName); + double[] largerGroup = ResourceUtils.getCounterValues(taskGroup.longTasks, counterName); + DescriptiveStatistics statistics = new DescriptiveStatistics(); + double avgSmaller = statistics.getMeanImpl().evaluate(smallerGroup); + double avgLarger = statistics.getMeanImpl().evaluate(largerGroup); + + List<MRTaskExecutionResponse.SuggestionResult> suggestionResults = getDeviationSuggest(avgSmaller, avgLarger); + MRTaskExecutionResponse.JobSuggestionResponse response = new MRTaskExecutionResponse.JobSuggestionResponse(); + response.suggestionResults = suggestionResults; + response.suggestionType = suggestType.toString(); + return response; + } + + private List<MRTaskExecutionResponse.SuggestionResult> getDeviationSuggest(double avgSmaller, double avgLarger) { + if (avgSmaller <= 0) { + avgSmaller = 1; + } + double deviation = avgLarger / avgSmaller; + String suggestName = String.format(DEVIATION_SUGGEST_FORMAT, counterName.getName()); + String suggestion = null; + if (deviation > threshold) { + suggestion = String.format(DATA_SKEW_SUGGESTION_FORMAT, counterName.getName(), threshold, avgLarger, avgSmaller); + } + List<MRTaskExecutionResponse.SuggestionResult> suggestionResults = new ArrayList<>(); + suggestionResults.add(new MRTaskExecutionResponse.SuggestionResult(suggestName, deviation, suggestion)); + return suggestionResults; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapGCFunc.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapGCFunc.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapGCFunc.java new file mode 100644 index 0000000..9fd54b0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapGCFunc.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm.suggestion; + +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.service.jpm.MRTaskExecutionResponse; + +public class MapGCFunc extends AbstractGCFunc { + + public MapGCFunc() { + super(Constants.SuggestionType.MapGC); + } + + public MapGCFunc(double threshold) { + super(Constants.SuggestionType.MapGC, threshold); + } + + @Override + protected MRTaskExecutionResponse.TaskGroup getTasks(MRTaskExecutionResponse.TaskGroupResponse tasks) { + return tasks.tasksGroupByType.get(Constants.TaskType.MAP.toString()); + } +}